meter.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package metrics
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Meters count events to produce exponentially-weighted moving average rates
  7. // at one-, five-, and fifteen-minutes and a mean rate.
  8. type Meter interface {
  9. Count() int64
  10. Mark(int64)
  11. Rate1() float64
  12. Rate5() float64
  13. Rate15() float64
  14. RateMean() float64
  15. Snapshot() Meter
  16. Stop()
  17. }
  18. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  19. // new StandardMeter.
  20. func GetOrRegisterMeter(name string, r Registry) Meter {
  21. if nil == r {
  22. r = DefaultRegistry
  23. }
  24. return r.GetOrRegister(name, NewMeter).(Meter)
  25. }
  26. // NewMeter constructs a new StandardMeter and launches a goroutine.
  27. func NewMeter() Meter {
  28. if UseNilMetrics {
  29. return NilMeter{}
  30. }
  31. m := newStandardMeter()
  32. arbiter.Lock()
  33. defer arbiter.Unlock()
  34. m.id = arbiter.newID()
  35. arbiter.meters[m.id] = m
  36. if !arbiter.started {
  37. arbiter.started = true
  38. go arbiter.tick()
  39. }
  40. return m
  41. }
  42. // NewMeter constructs and registers a new StandardMeter and launches a
  43. // goroutine.
  44. func NewRegisteredMeter(name string, r Registry) Meter {
  45. c := NewMeter()
  46. if nil == r {
  47. r = DefaultRegistry
  48. }
  49. r.Register(name, c)
  50. return c
  51. }
  52. // MeterSnapshot is a read-only copy of another Meter.
  53. type MeterSnapshot struct {
  54. count int64
  55. rate1, rate5, rate15, rateMean float64
  56. }
  57. // Count returns the count of events at the time the snapshot was taken.
  58. func (m *MeterSnapshot) Count() int64 { return m.count }
  59. // Mark panics.
  60. func (*MeterSnapshot) Mark(n int64) {
  61. panic("Mark called on a MeterSnapshot")
  62. }
  63. // Rate1 returns the one-minute moving average rate of events per second at the
  64. // time the snapshot was taken.
  65. func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
  66. // Rate5 returns the five-minute moving average rate of events per second at
  67. // the time the snapshot was taken.
  68. func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
  69. // Rate15 returns the fifteen-minute moving average rate of events per second
  70. // at the time the snapshot was taken.
  71. func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
  72. // RateMean returns the meter's mean rate of events per second at the time the
  73. // snapshot was taken.
  74. func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
  75. // Snapshot returns the snapshot.
  76. func (m *MeterSnapshot) Snapshot() Meter { return m }
  77. // Stop is a no-op.
  78. func (m *MeterSnapshot) Stop() {}
  79. // NilMeter is a no-op Meter.
  80. type NilMeter struct{}
  81. // Count is a no-op.
  82. func (NilMeter) Count() int64 { return 0 }
  83. // Mark is a no-op.
  84. func (NilMeter) Mark(n int64) {}
  85. // Rate1 is a no-op.
  86. func (NilMeter) Rate1() float64 { return 0.0 }
  87. // Rate5 is a no-op.
  88. func (NilMeter) Rate5() float64 { return 0.0 }
  89. // Rate15is a no-op.
  90. func (NilMeter) Rate15() float64 { return 0.0 }
  91. // RateMean is a no-op.
  92. func (NilMeter) RateMean() float64 { return 0.0 }
  93. // Snapshot is a no-op.
  94. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  95. // Stop is a no-op.
  96. func (NilMeter) Stop() {}
  97. // StandardMeter is the standard implementation of a Meter.
  98. type StandardMeter struct {
  99. lock sync.RWMutex
  100. snapshot *MeterSnapshot
  101. a1, a5, a15 EWMA
  102. startTime time.Time
  103. stopped bool
  104. id int64
  105. }
  106. func newStandardMeter() *StandardMeter {
  107. return &StandardMeter{
  108. snapshot: &MeterSnapshot{},
  109. a1: NewEWMA1(),
  110. a5: NewEWMA5(),
  111. a15: NewEWMA15(),
  112. startTime: time.Now(),
  113. }
  114. }
  115. // Stop stops the meter, Mark() will panic if you use it after being stopped.
  116. func (m *StandardMeter) Stop() {
  117. arbiter.Lock()
  118. defer arbiter.Unlock()
  119. m.stopped = true
  120. delete(arbiter.meters, m.id)
  121. }
  122. // Count returns the number of events recorded.
  123. func (m *StandardMeter) Count() int64 {
  124. m.lock.RLock()
  125. count := m.snapshot.count
  126. m.lock.RUnlock()
  127. return count
  128. }
  129. // Mark records the occurance of n events.
  130. func (m *StandardMeter) Mark(n int64) {
  131. m.lock.Lock()
  132. defer m.lock.Unlock()
  133. if m.stopped {
  134. panic("Mark called on a stopped Meter")
  135. }
  136. m.snapshot.count += n
  137. m.a1.Update(n)
  138. m.a5.Update(n)
  139. m.a15.Update(n)
  140. m.updateSnapshot()
  141. }
  142. // Rate1 returns the one-minute moving average rate of events per second.
  143. func (m *StandardMeter) Rate1() float64 {
  144. m.lock.RLock()
  145. rate1 := m.snapshot.rate1
  146. m.lock.RUnlock()
  147. return rate1
  148. }
  149. // Rate5 returns the five-minute moving average rate of events per second.
  150. func (m *StandardMeter) Rate5() float64 {
  151. m.lock.RLock()
  152. rate5 := m.snapshot.rate5
  153. m.lock.RUnlock()
  154. return rate5
  155. }
  156. // Rate15 returns the fifteen-minute moving average rate of events per second.
  157. func (m *StandardMeter) Rate15() float64 {
  158. m.lock.RLock()
  159. rate15 := m.snapshot.rate15
  160. m.lock.RUnlock()
  161. return rate15
  162. }
  163. // RateMean returns the meter's mean rate of events per second.
  164. func (m *StandardMeter) RateMean() float64 {
  165. m.lock.RLock()
  166. rateMean := m.snapshot.rateMean
  167. m.lock.RUnlock()
  168. return rateMean
  169. }
  170. // Snapshot returns a read-only copy of the meter.
  171. func (m *StandardMeter) Snapshot() Meter {
  172. m.lock.RLock()
  173. snapshot := *m.snapshot
  174. m.lock.RUnlock()
  175. return &snapshot
  176. }
  177. func (m *StandardMeter) updateSnapshot() {
  178. // should run with write lock held on m.lock
  179. snapshot := m.snapshot
  180. snapshot.rate1 = m.a1.Rate()
  181. snapshot.rate5 = m.a5.Rate()
  182. snapshot.rate15 = m.a15.Rate()
  183. snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
  184. }
  185. func (m *StandardMeter) tick() {
  186. m.lock.Lock()
  187. defer m.lock.Unlock()
  188. m.a1.Tick()
  189. m.a5.Tick()
  190. m.a15.Tick()
  191. m.updateSnapshot()
  192. }
  193. type meterArbiter struct {
  194. sync.RWMutex
  195. started bool
  196. meters map[int64]*StandardMeter
  197. ticker *time.Ticker
  198. id int64
  199. }
  200. var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[int64]*StandardMeter)}
  201. // Ticks meters on the scheduled interval
  202. func (ma *meterArbiter) tick() {
  203. for {
  204. select {
  205. case <-ma.ticker.C:
  206. ma.tickMeters()
  207. }
  208. }
  209. }
  210. // should only be called with Lock() held
  211. func (ma *meterArbiter) newID() int64 {
  212. ma.id++
  213. return ma.id
  214. }
  215. func (ma *meterArbiter) tickMeters() {
  216. ma.RLock()
  217. defer ma.RUnlock()
  218. for _, meter := range ma.meters {
  219. meter.tick()
  220. }
  221. }