meter.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package metrics
  2. import (
  3. "time"
  4. )
  5. type Meter interface {
  6. Count() int64
  7. Mark(int64)
  8. Rate1() float64
  9. Rate5() float64
  10. Rate15() float64
  11. RateMean() float64
  12. }
  13. type meter struct {
  14. in chan int64
  15. out chan meterV
  16. reset chan bool
  17. ticker *time.Ticker
  18. }
  19. type meterV struct {
  20. count int64
  21. rate1, rate5, rate15, rateMean float64
  22. }
  23. func NewMeter() Meter {
  24. m := &meter{
  25. make(chan int64),
  26. make(chan meterV),
  27. make(chan bool),
  28. time.NewTicker(5e9),
  29. }
  30. go m.arbiter()
  31. return m
  32. }
  33. func (m *meter) Clear() {
  34. m.reset <- true
  35. }
  36. func (m *meter) Count() int64 {
  37. return (<-m.out).count
  38. }
  39. func (m *meter) Mark(n int64) {
  40. m.in <- n
  41. }
  42. func (m *meter) Rate1() float64 {
  43. return (<-m.out).rate1
  44. }
  45. func (m *meter) Rate5() float64 {
  46. return (<-m.out).rate5
  47. }
  48. func (m *meter) Rate15() float64 {
  49. return (<-m.out).rate15
  50. }
  51. func (m *meter) RateMean() float64 {
  52. return (<-m.out).rateMean
  53. }
  54. func (m *meter) arbiter() {
  55. var mv meterV
  56. a1 := NewEWMA1()
  57. a5 := NewEWMA5()
  58. a15 := NewEWMA15()
  59. tsStart := time.Nanoseconds()
  60. for {
  61. select {
  62. case n := <-m.in:
  63. mv.count += n
  64. a1.Update(n); mv.rate1 = a1.Rate()
  65. a5.Update(n); mv.rate5 = a5.Rate()
  66. a15.Update(n); mv.rate15 = a15.Rate()
  67. mv.rateMean = float64(1e9 * mv.count) / float64(
  68. time.Nanoseconds() - tsStart)
  69. case m.out <- mv:
  70. case <-m.reset:
  71. mv = meterV{}
  72. tsStart = time.Nanoseconds()
  73. case <-m.ticker.C:
  74. a1.Tick()
  75. a5.Tick()
  76. a15.Tick()
  77. }
  78. }
  79. }