| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package metrics
- import (
- "time"
- )
- type Meter interface {
- Count() int64
- Mark(int64)
- Rate1() float64
- Rate5() float64
- Rate15() float64
- RateMean() float64
- }
- type meter struct {
- in chan int64
- out chan meterV
- reset, tick chan bool
- }
- type meterV struct {
- count int64
- rate1, rate5, rate15, rateMean float64
- }
- func NewMeter() Meter {
- m := &meter{
- make(chan int64),
- make(chan meterV),
- make(chan bool), make(chan bool),
- }
- go m.arbiter()
- go m.ticker()
- return m
- }
- func (m *meter) Clear() {
- m.reset <- true
- }
- func (m *meter) Count() int64 {
- return (<-m.out).count
- }
- func (m *meter) Mark(n int64) {
- m.in <- n
- }
- func (m *meter) Rate1() float64 {
- return (<-m.out).rate1
- }
- func (m *meter) Rate5() float64 {
- return (<-m.out).rate5
- }
- func (m *meter) Rate15() float64 {
- return (<-m.out).rate15
- }
- func (m *meter) RateMean() float64 {
- return (<-m.out).rateMean
- }
- func (m *meter) arbiter() {
- var mv meterV
- a1 := NewEWMA1()
- a5 := NewEWMA5()
- a15 := NewEWMA15()
- tsStart := time.Nanoseconds()
- for {
- select {
- case n := <-m.in:
- mv.count += n
- a1.Update(n); mv.rate1 = a1.Rate()
- a5.Update(n); mv.rate5 = a5.Rate()
- a15.Update(n); mv.rate15 = a15.Rate()
- mv.rateMean = float64(1e9 * mv.count) / float64(
- time.Nanoseconds() - tsStart)
- case m.out <- mv:
- case <-m.reset:
- mv = meterV{}
- a1.Clear()
- a5.Clear()
- a15.Clear()
- tsStart = time.Nanoseconds()
- case <-m.tick:
- a1.Tick()
- a5.Tick()
- a15.Tick()
- }
- }
- }
- func (m *meter) ticker() {
- for {
- time.Sleep(5e9)
- m.tick <- true
- }
- }
|