meter.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package metrics
  2. import (
  3. "math"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // Meters count events to produce exponentially-weighted moving average rates
  9. // at one-, five-, and fifteen-minutes and a mean rate.
  10. type Meter interface {
  11. Count() int64
  12. Mark(int64)
  13. Rate1() float64
  14. Rate5() float64
  15. Rate15() float64
  16. RateMean() float64
  17. Snapshot() Meter
  18. Stop()
  19. }
  20. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  21. // new StandardMeter.
  22. // Be sure to unregister the meter from the registry once it is of no use to
  23. // allow for garbage collection.
  24. func GetOrRegisterMeter(name string, r Registry) Meter {
  25. if nil == r {
  26. r = DefaultRegistry
  27. }
  28. return r.GetOrRegister(name, NewMeter).(Meter)
  29. }
  30. // NewMeter constructs a new StandardMeter and launches a goroutine.
  31. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  32. func NewMeter() Meter {
  33. if UseNilMetrics {
  34. return NilMeter{}
  35. }
  36. m := newStandardMeter()
  37. arbiter.Lock()
  38. defer arbiter.Unlock()
  39. arbiter.meters[m] = struct{}{}
  40. if !arbiter.started {
  41. arbiter.started = true
  42. go arbiter.tick()
  43. }
  44. return m
  45. }
  46. // NewMeter constructs and registers a new StandardMeter and launches a
  47. // goroutine.
  48. // Be sure to unregister the meter from the registry once it is of no use to
  49. // allow for garbage collection.
  50. func NewRegisteredMeter(name string, r Registry) Meter {
  51. c := NewMeter()
  52. if nil == r {
  53. r = DefaultRegistry
  54. }
  55. r.Register(name, c)
  56. return c
  57. }
  58. // MeterSnapshot is a read-only copy of another Meter.
  59. type MeterSnapshot struct {
  60. count int64
  61. rate1, rate5, rate15, rateMean uint64
  62. }
  63. // Count returns the count of events at the time the snapshot was taken.
  64. func (m *MeterSnapshot) Count() int64 { return m.count }
  65. // Mark panics.
  66. func (*MeterSnapshot) Mark(n int64) {
  67. panic("Mark called on a MeterSnapshot")
  68. }
  69. // Rate1 returns the one-minute moving average rate of events per second at the
  70. // time the snapshot was taken.
  71. func (m *MeterSnapshot) Rate1() float64 { return math.Float64frombits(m.rate1) }
  72. // Rate5 returns the five-minute moving average rate of events per second at
  73. // the time the snapshot was taken.
  74. func (m *MeterSnapshot) Rate5() float64 { return math.Float64frombits(m.rate5) }
  75. // Rate15 returns the fifteen-minute moving average rate of events per second
  76. // at the time the snapshot was taken.
  77. func (m *MeterSnapshot) Rate15() float64 { return math.Float64frombits(m.rate15) }
  78. // RateMean returns the meter's mean rate of events per second at the time the
  79. // snapshot was taken.
  80. func (m *MeterSnapshot) RateMean() float64 { return math.Float64frombits(m.rateMean) }
  81. // Snapshot returns the snapshot.
  82. func (m *MeterSnapshot) Snapshot() Meter { return m }
  83. // Stop is a no-op.
  84. func (m *MeterSnapshot) Stop() {}
  85. // NilMeter is a no-op Meter.
  86. type NilMeter struct{}
  87. // Count is a no-op.
  88. func (NilMeter) Count() int64 { return 0 }
  89. // Mark is a no-op.
  90. func (NilMeter) Mark(n int64) {}
  91. // Rate1 is a no-op.
  92. func (NilMeter) Rate1() float64 { return 0.0 }
  93. // Rate5 is a no-op.
  94. func (NilMeter) Rate5() float64 { return 0.0 }
  95. // Rate15is a no-op.
  96. func (NilMeter) Rate15() float64 { return 0.0 }
  97. // RateMean is a no-op.
  98. func (NilMeter) RateMean() float64 { return 0.0 }
  99. // Snapshot is a no-op.
  100. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  101. // Stop is a no-op.
  102. func (NilMeter) Stop() {}
  103. // StandardMeter is the standard implementation of a Meter.
  104. type StandardMeter struct {
  105. // Only used on stop.
  106. lock sync.Mutex
  107. snapshot *MeterSnapshot
  108. a1, a5, a15 EWMA
  109. startTime time.Time
  110. stopped uint32
  111. }
  112. func newStandardMeter() *StandardMeter {
  113. return &StandardMeter{
  114. snapshot: &MeterSnapshot{},
  115. a1: NewEWMA1(),
  116. a5: NewEWMA5(),
  117. a15: NewEWMA15(),
  118. startTime: time.Now(),
  119. }
  120. }
  121. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
  122. func (m *StandardMeter) Stop() {
  123. m.lock.Lock()
  124. stopped := m.stopped
  125. m.stopped = 1
  126. m.lock.Unlock()
  127. if stopped != 1 {
  128. arbiter.Lock()
  129. delete(arbiter.meters, m)
  130. arbiter.Unlock()
  131. }
  132. }
  133. // Count returns the number of events recorded.
  134. func (m *StandardMeter) Count() int64 {
  135. return atomic.LoadInt64(&m.snapshot.count)
  136. }
  137. // Mark records the occurance of n events.
  138. func (m *StandardMeter) Mark(n int64) {
  139. if atomic.LoadUint32(&m.stopped) == 1 {
  140. return
  141. }
  142. atomic.AddInt64(&m.snapshot.count, n)
  143. m.a1.Update(n)
  144. m.a5.Update(n)
  145. m.a15.Update(n)
  146. m.updateSnapshot()
  147. }
  148. // Rate1 returns the one-minute moving average rate of events per second.
  149. func (m *StandardMeter) Rate1() float64 {
  150. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate1))
  151. }
  152. // Rate5 returns the five-minute moving average rate of events per second.
  153. func (m *StandardMeter) Rate5() float64 {
  154. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate5))
  155. }
  156. // Rate15 returns the fifteen-minute moving average rate of events per second.
  157. func (m *StandardMeter) Rate15() float64 {
  158. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate15))
  159. }
  160. // RateMean returns the meter's mean rate of events per second.
  161. func (m *StandardMeter) RateMean() float64 {
  162. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rateMean))
  163. }
  164. // Snapshot returns a read-only copy of the meter.
  165. func (m *StandardMeter) Snapshot() Meter {
  166. copiedSnapshot := MeterSnapshot{
  167. count: atomic.LoadInt64(&m.snapshot.count),
  168. rate1: atomic.LoadUint64(&m.snapshot.rate1),
  169. rate5: atomic.LoadUint64(&m.snapshot.rate5),
  170. rate15: atomic.LoadUint64(&m.snapshot.rate15),
  171. rateMean: atomic.LoadUint64(&m.snapshot.rateMean),
  172. }
  173. return &copiedSnapshot
  174. }
  175. func (m *StandardMeter) updateSnapshot() {
  176. rate1 := math.Float64bits(m.a1.Rate())
  177. rate5 := math.Float64bits(m.a5.Rate())
  178. rate15 := math.Float64bits(m.a15.Rate())
  179. rateMean := math.Float64bits(float64(m.Count()) / time.Since(m.startTime).Seconds())
  180. atomic.StoreUint64(&m.snapshot.rate1, rate1)
  181. atomic.StoreUint64(&m.snapshot.rate5, rate5)
  182. atomic.StoreUint64(&m.snapshot.rate15, rate15)
  183. atomic.StoreUint64(&m.snapshot.rateMean, rateMean)
  184. }
  185. func (m *StandardMeter) tick() {
  186. m.a1.Tick()
  187. m.a5.Tick()
  188. m.a15.Tick()
  189. m.updateSnapshot()
  190. }
  191. // meterArbiter ticks meters every 5s from a single goroutine.
  192. // meters are references in a set for future stopping.
  193. type meterArbiter struct {
  194. sync.RWMutex
  195. started bool
  196. meters map[*StandardMeter]struct{}
  197. ticker *time.Ticker
  198. }
  199. var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
  200. // Ticks meters on the scheduled interval
  201. func (ma *meterArbiter) tick() {
  202. for {
  203. select {
  204. case <-ma.ticker.C:
  205. ma.tickMeters()
  206. }
  207. }
  208. }
  209. func (ma *meterArbiter) tickMeters() {
  210. ma.RLock()
  211. defer ma.RUnlock()
  212. for meter := range ma.meters {
  213. meter.tick()
  214. }
  215. }