meter.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package metrics
  2. import "time"
  3. type Meter interface {
  4. Count() int64
  5. Mark(int64)
  6. Rate1() float64
  7. Rate5() float64
  8. Rate15() float64
  9. RateMean() float64
  10. }
  11. type meter struct {
  12. in chan int64
  13. out chan meterV
  14. reset chan bool
  15. ticker *time.Ticker
  16. }
  17. type meterV struct {
  18. count int64
  19. rate1, rate5, rate15, rateMean float64
  20. }
  21. func NewMeter() Meter {
  22. m := &meter{
  23. make(chan int64),
  24. make(chan meterV),
  25. make(chan bool),
  26. time.NewTicker(5e9),
  27. }
  28. go m.arbiter()
  29. return m
  30. }
  31. func (m *meter) Clear() {
  32. m.reset <- true
  33. }
  34. func (m *meter) Count() int64 {
  35. return (<-m.out).count
  36. }
  37. func (m *meter) Mark(n int64) {
  38. m.in <- n
  39. }
  40. func (m *meter) Rate1() float64 {
  41. return (<-m.out).rate1
  42. }
  43. func (m *meter) Rate5() float64 {
  44. return (<-m.out).rate5
  45. }
  46. func (m *meter) Rate15() float64 {
  47. return (<-m.out).rate15
  48. }
  49. func (m *meter) RateMean() float64 {
  50. return (<-m.out).rateMean
  51. }
  52. func (m *meter) arbiter() {
  53. var mv meterV
  54. a1 := NewEWMA1()
  55. a5 := NewEWMA5()
  56. a15 := NewEWMA15()
  57. tsStart := time.Nanoseconds()
  58. for {
  59. select {
  60. case n := <-m.in:
  61. mv.count += n
  62. a1.Update(n); mv.rate1 = a1.Rate()
  63. a5.Update(n); mv.rate5 = a5.Rate()
  64. a15.Update(n); mv.rate15 = a15.Rate()
  65. mv.rateMean = float64(1e9 * mv.count) / float64(
  66. time.Nanoseconds() - tsStart)
  67. case m.out <- mv:
  68. case <-m.reset:
  69. mv = meterV{}
  70. tsStart = time.Nanoseconds()
  71. case <-m.ticker.C:
  72. a1.Tick()
  73. a5.Tick()
  74. a15.Tick()
  75. }
  76. }
  77. }