Sfoglia il codice sorgente

Added meters and exponentially-weighted moving averages.

Richard Crowley 14 anni fa
parent
commit
7272529a65
6 ha cambiato i file con 440 aggiunte e 4 eliminazioni
  1. 1 0
      Makefile
  2. 26 0
      cmd/metrics/metrics.go
  3. 82 0
      ewma.go
  4. 216 0
      ewma_test.go
  5. 95 4
      meter.go
  6. 20 0
      meter_test.go

+ 1 - 0
Makefile

@@ -3,6 +3,7 @@ include $(GOROOT)/src/Make.inc
 TARG=metrics
 GOFILES=\
 	counter.go\
+	ewma.go\
 	gauge.go\
 	healthcheck.go\
 	histogram.go\

+ 26 - 0
cmd/metrics/metrics.go

@@ -56,6 +56,7 @@ func main() {
 	}
 */
 
+/*
 	s := metrics.NewExpDecaySample(1028, 0.015)
 //	s := metrics.NewUniformSample(1028)
 	h := metrics.NewHistogram(s)
@@ -83,5 +84,30 @@ func main() {
 		)
 		time.Sleep(500e6)
 	}
+*/
+
+	m := metrics.NewMeter()
+	r.RegisterMeter("bang", m)
+	for i := 0; i < 1000; i++ {
+		go func() {
+			for {
+				m.Mark(19)
+				time.Sleep(300e6)
+			}
+		}()
+		go func() {
+			for {
+				m.Mark(47)
+				time.Sleep(400e6)
+			}
+		}()
+	}
+	for {
+		fmt.Printf(
+			"m: %v %v %v %v %v\n",
+			m.Count(), m.Rate1(), m.Rate5(), m.Rate15(), m.RateMean(),
+		)
+		time.Sleep(500e6)
+	}
 
 }

+ 82 - 0
ewma.go

@@ -0,0 +1,82 @@
+package metrics
+
+import (
+	"math"
+)
+
+type EWMA interface {
+	Clear()
+	Rate() float64
+	Tick()
+	Update(int64)
+}
+
+type ewma struct {
+	alpha float64
+	in chan int64
+	out chan float64
+	reset, tick chan bool
+}
+
+func NewEWMA(alpha float64) EWMA {
+	a := &ewma{
+		alpha,
+		make(chan int64),
+		make(chan float64),
+		make(chan bool), make(chan bool),
+	}
+	go a.arbiter()
+	return a
+}
+
+func NewEWMA1() EWMA {
+	return NewEWMA(1 - math.Exp(-5.0 / 60.0 / 1))
+}
+
+func NewEWMA5() EWMA {
+	return NewEWMA(1 - math.Exp(-5.0 / 60.0 / 5))
+}
+
+func NewEWMA15() EWMA {
+	return NewEWMA(1 - math.Exp(-5.0 / 60.0 / 15))
+}
+
+func (a *ewma) Clear() {
+	a.reset <- true
+}
+
+func (a *ewma) Rate() float64 {
+	return <-a.out * float64(1e9)
+}
+
+func (a *ewma) Tick() {
+	a.tick <- true
+}
+
+func (a *ewma) Update(n int64) {
+	a.in <- n
+}
+
+func (a *ewma) arbiter() {
+	initialized := false
+	var uncounted int64
+	var rate float64
+	for {
+		select {
+		case n := <-a.in: uncounted += n
+		case a.out <- rate:
+		case <-a.reset:
+			uncounted = 0
+			rate = 0.0
+		case <-a.tick:
+			instantRate := float64(uncounted) / float64(5e9)
+			if initialized {
+				rate += a.alpha * (instantRate - rate)
+			} else {
+				initialized = true
+				rate = instantRate
+			}
+			uncounted = 0
+		}
+	}
+}

+ 216 - 0
ewma_test.go

@@ -0,0 +1,216 @@
+package metrics
+
+import (
+	"testing"
+)
+
+func TestEWMA1(t *testing.T) {
+	a := NewEWMA1()
+	a.Update(3)
+	a.Tick()
+	if rate := a.Rate(); 0.6 != rate {
+		t.Errorf("initial a.Rate(): 0.6 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.22072766470286553 != rate {
+		t.Errorf("1 minute a.Rate(): 0.22072766470286553 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.08120116994196772 != rate {
+		t.Errorf("2 minute a.Rate(): 0.08120116994196772 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.029872241020718428 != rate {
+		t.Errorf("3 minute a.Rate(): 0.029872241020718428 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.01098938333324054 != rate {
+		t.Errorf("4 minute a.Rate(): 0.01098938333324054 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.004042768199451294 != rate {
+		t.Errorf("5 minute a.Rate(): 0.004042768199451294 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.0014872513059998212 != rate {
+		t.Errorf("6 minute a.Rate(): 0.0014872513059998212 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.0005471291793327122 != rate {
+		t.Errorf("7 minute a.Rate(): 0.0005471291793327122 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.00020127757674150815 != rate {
+		t.Errorf("8 minute a.Rate(): 0.00020127757674150815 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 7.404588245200814e-05 != rate {
+		t.Errorf("9 minute a.Rate(): 7.404588245200814e-05 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 2.7239957857491083e-05 != rate {
+		t.Errorf("10 minute a.Rate(): 2.7239957857491083e-05 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 1.0021020474147462e-05 != rate {
+		t.Errorf("11 minute a.Rate(): 1.0021020474147462e-05 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 3.6865274119969525e-06 != rate {
+		t.Errorf("12 minute a.Rate(): 3.6865274119969525e-06 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 1.3561976441886433e-06 != rate {
+		t.Errorf("13 minute a.Rate(): 1.3561976441886433e-06 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 4.989172314621449e-07 != rate {
+		t.Errorf("14 minute a.Rate(): 4.989172314621449e-07 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 1.8354139230109722e-07 != rate {
+		t.Errorf("15 minute a.Rate(): 1.8354139230109722e-07 != %v\n", rate)
+	}
+}
+
+func TestEWMA5(t *testing.T) {
+	a := NewEWMA5()
+	a.Update(3)
+	a.Tick()
+	if rate := a.Rate(); 0.6 != rate {
+		t.Errorf("initial a.Rate(): 0.6 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.49123845184678905 != rate {
+		t.Errorf("1 minute a.Rate(): 0.49123845184678905 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.4021920276213837 != rate {
+		t.Errorf("2 minute a.Rate(): 0.4021920276213837 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.32928698165641596 != rate {
+		t.Errorf("3 minute a.Rate(): 0.32928698165641596 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.269597378470333 != rate {
+		t.Errorf("4 minute a.Rate(): 0.269597378470333 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.2207276647028654 != rate {
+		t.Errorf("5 minute a.Rate(): 0.2207276647028654 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.18071652714732128 != rate {
+		t.Errorf("6 minute a.Rate(): 0.18071652714732128 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.14795817836496392 != rate {
+		t.Errorf("7 minute a.Rate(): 0.14795817836496392 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.12113791079679326 != rate {
+		t.Errorf("8 minute a.Rate(): 0.12113791079679326 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.09917933293295193 != rate {
+		t.Errorf("9 minute a.Rate(): 0.09917933293295193 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.08120116994196763 != rate {
+		t.Errorf("10 minute a.Rate(): 0.08120116994196763 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.06648189501740036 != rate {
+		t.Errorf("11 minute a.Rate(): 0.06648189501740036 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.05443077197364752 != rate {
+		t.Errorf("12 minute a.Rate(): 0.05443077197364752 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.04456414692860035 != rate {
+		t.Errorf("13 minute a.Rate(): 0.04456414692860035 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.03648603757513079 != rate {
+		t.Errorf("14 minute a.Rate(): 0.03648603757513079 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.0298722410207183831020718428 != rate {
+		t.Errorf("15 minute a.Rate(): 0.0298722410207183831020718428 != %v\n", rate)
+	}
+}
+
+func TestEWMA15(t *testing.T) {
+	a := NewEWMA15()
+	a.Update(3)
+	a.Tick()
+	if rate := a.Rate(); 0.6 != rate {
+		t.Errorf("initial a.Rate(): 0.6 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.5613041910189706 != rate {
+		t.Errorf("1 minute a.Rate(): 0.5613041910189706 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.5251039914257684 != rate {
+		t.Errorf("2 minute a.Rate(): 0.5251039914257684 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.4912384518467888184678905 != rate {
+		t.Errorf("3 minute a.Rate(): 0.4912384518467888184678905 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.459557003018789 != rate {
+		t.Errorf("4 minute a.Rate(): 0.459557003018789 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.4299187863442732 != rate {
+		t.Errorf("5 minute a.Rate(): 0.4299187863442732 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.4021920276213831 != rate {
+		t.Errorf("6 minute a.Rate(): 0.4021920276213831 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.37625345116383313 != rate {
+		t.Errorf("7 minute a.Rate(): 0.37625345116383313 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.3519877317060185 != rate {
+		t.Errorf("8 minute a.Rate(): 0.3519877317060185 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.3292869816564153165641596 != rate {
+		t.Errorf("9 minute a.Rate(): 0.3292869816564153165641596 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.3080502714195546 != rate {
+		t.Errorf("10 minute a.Rate(): 0.3080502714195546 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.2881831806538789 != rate {
+		t.Errorf("11 minute a.Rate(): 0.2881831806538789 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.26959737847033216 != rate {
+		t.Errorf("12 minute a.Rate(): 0.26959737847033216 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.2522102307052083 != rate {
+		t.Errorf("13 minute a.Rate(): 0.2522102307052083 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.23594443252115815 != rate {
+		t.Errorf("14 minute a.Rate(): 0.23594443252115815 != %v\n", rate)
+	}
+	elapseMinute(a)
+	if rate := a.Rate(); 0.2207276647028646247028654470286553 != rate {
+		t.Errorf("15 minute a.Rate(): 0.2207276647028646247028654470286553 != %v\n", rate)
+	}
+}
+
+func elapseMinute(a EWMA) {
+	for i := 0; i < 12; i++ { a.Tick() }
+}

+ 95 - 4
meter.go

@@ -1,10 +1,101 @@
 package metrics
 
+import (
+	"time"
+)
+
 type Meter interface {
 	Count() int64
 	Mark(int64)
-	Rate1()
-	Rate5()
-	Rate15()
-	RateMean()
+	Rate1() float64
+	Rate5() float64
+	Rate15() float64
+	RateMean() float64
+}
+
+type meter struct {
+	in chan int64
+	out chan meterV
+	reset, tick chan bool
+}
+
+type meterV struct {
+	count int64
+	rate1, rate5, rate15, rateMean float64
+}
+
+func NewMeter() Meter {
+	m := &meter{
+		make(chan int64),
+		make(chan meterV),
+		make(chan bool), make(chan bool),
+	}
+	go m.arbiter()
+	go m.ticker()
+	return m
+}
+
+func (m *meter) Clear() {
+	m.reset <- true
+}
+
+func (m *meter) Count() int64 {
+	return (<-m.out).count
+}
+
+func (m *meter) Mark(n int64) {
+	m.in <- n
+}
+
+func (m *meter) Rate1() float64 {
+	return (<-m.out).rate1
+}
+
+func (m *meter) Rate5() float64 {
+	return (<-m.out).rate5
+}
+
+func (m *meter) Rate15() float64 {
+	return (<-m.out).rate15
+}
+
+func (m *meter) RateMean() float64 {
+	return (<-m.out).rateMean
+}
+
+func (m *meter) arbiter() {
+	var mv meterV
+	a1 := NewEWMA1()
+	a5 := NewEWMA5()
+	a15 := NewEWMA15()
+	tsStart := time.Nanoseconds()
+	for {
+		select {
+		case n := <-m.in:
+			mv.count += n
+			a1.Update(n); mv.rate1 = a1.Rate()
+			a5.Update(n); mv.rate5 = a5.Rate()
+			a15.Update(n); mv.rate15 = a15.Rate()
+			mv.rateMean = float64(1e9 * mv.count) / float64(
+				time.Nanoseconds() - tsStart)
+		case m.out <- mv:
+		case <-m.reset:
+			mv = meterV{}
+			a1.Clear()
+			a5.Clear()
+			a15.Clear()
+			tsStart = time.Nanoseconds()
+		case <-m.tick:
+			a1.Tick()
+			a5.Tick()
+			a15.Tick()
+		}
+	}
+}
+
+func (m *meter) ticker() {
+	for {
+		time.Sleep(5e9)
+		m.tick <- true
+	}
 }

+ 20 - 0
meter_test.go

@@ -0,0 +1,20 @@
+package metrics
+
+import (
+	"testing"
+)
+
+func TestMeterZero(t *testing.T) {
+	m := NewMeter()
+	if count := m.Count(); 0 != count {
+		t.Errorf("m.Count(): 0 != %v\n", count)
+	}
+}
+
+func TestMeterNonzero(t *testing.T) {
+	m := NewMeter()
+	m.Mark(3)
+	if count := m.Count(); 3 != count {
+		t.Errorf("m.Count(): 3 != %v\n", count)
+	}
+}