瀏覽代碼

Merge pull request #232 from deckarep/optimizations-lock-contention

Reduces lock-contention in high-performance usage of go-metrics.
Mikhail P 7 年之前
父節點
當前提交
f1af7f7bf2
共有 8 個文件被更改,包括 175 次插入69 次删除
  1. 0 1
      .travis.yml
  2. 33 13
      ewma.go
  3. 34 1
      ewma_test.go
  4. 7 9
      gauge_float64.go
  5. 10 0
      gauge_float64_test.go
  6. 19 0
      gauge_test.go
  7. 38 45
      meter.go
  8. 34 0
      meter_test.go

+ 0 - 1
.travis.yml

@@ -1,7 +1,6 @@
 language: go
 
 go:
-    - 1.2
     - 1.3
     - 1.4
     - 1.5

+ 33 - 13
ewma.go

@@ -79,16 +79,15 @@ func (NilEWMA) Update(n int64) {}
 type StandardEWMA struct {
 	uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment
 	alpha     float64
-	rate      float64
-	init      bool
+	rate      uint64
+	init      uint32
 	mutex     sync.Mutex
 }
 
 // Rate returns the moving average rate of events per second.
 func (a *StandardEWMA) Rate() float64 {
-	a.mutex.Lock()
-	defer a.mutex.Unlock()
-	return a.rate * float64(1e9)
+	currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate)) * float64(1e9)
+	return currentRate
 }
 
 // Snapshot returns a read-only copy of the EWMA.
@@ -99,17 +98,38 @@ func (a *StandardEWMA) Snapshot() EWMA {
 // Tick ticks the clock to update the moving average.  It assumes it is called
 // every five seconds.
 func (a *StandardEWMA) Tick() {
+	// Optimization to avoid mutex locking in the hot-path.
+	if atomic.LoadUint32(&a.init) == 1 {
+		a.updateRate(a.fetchInstantRate())
+	} else {
+		// Slow-path: this is only needed on the first Tick() and preserves transactional updating
+		// of init and rate in the else block. The first conditional is needed below because
+		// a different thread could have set a.init = 1 between the time of the first atomic load and when
+		// the lock was acquired.
+		a.mutex.Lock()
+		if atomic.LoadUint32(&a.init) == 1 {
+			// The fetchInstantRate() uses atomic loading, which is unecessary in this critical section
+			// but again, this section is only invoked on the first successful Tick() operation.
+			a.updateRate(a.fetchInstantRate())
+		} else {
+			atomic.StoreUint32(&a.init, 1)
+			atomic.StoreUint64(&a.rate, math.Float64bits(a.fetchInstantRate()))
+		}
+		a.mutex.Unlock()
+	}
+}
+
+func (a *StandardEWMA) fetchInstantRate() float64 {
 	count := atomic.LoadInt64(&a.uncounted)
 	atomic.AddInt64(&a.uncounted, -count)
 	instantRate := float64(count) / float64(5e9)
-	a.mutex.Lock()
-	defer a.mutex.Unlock()
-	if a.init {
-		a.rate += a.alpha * (instantRate - a.rate)
-	} else {
-		a.init = true
-		a.rate = instantRate
-	}
+	return instantRate
+}
+
+func (a *StandardEWMA) updateRate(instantRate float64) {
+	currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate))
+	currentRate += a.alpha * (instantRate - currentRate)
+	atomic.StoreUint64(&a.rate, math.Float64bits(currentRate))
 }
 
 // Update adds n uncounted events.

+ 34 - 1
ewma_test.go

@@ -1,6 +1,11 @@
 package metrics
 
-import "testing"
+import (
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+)
 
 func BenchmarkEWMA(b *testing.B) {
 	a := NewEWMA1()
@@ -11,6 +16,34 @@ func BenchmarkEWMA(b *testing.B) {
 	}
 }
 
+func BenchmarkEWMAParallel(b *testing.B) {
+	a := NewEWMA1()
+	b.ResetTimer()
+
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			a.Update(1)
+			a.Tick()
+		}
+	})
+}
+
+// exercise race detector
+func TestEWMAConcurrency(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	a := NewEWMA1()
+	wg := &sync.WaitGroup{}
+	reps := 100
+	for i := 0; i < reps; i++ {
+		wg.Add(1)
+		go func(ewma EWMA, wg *sync.WaitGroup) {
+			a.Update(rand.Int63())
+			wg.Done()
+		}(a, wg)
+	}
+	wg.Wait()
+}
+
 func TestEWMA1(t *testing.T) {
 	a := NewEWMA1()
 	a.Update(3)

+ 7 - 9
gauge_float64.go

@@ -1,6 +1,9 @@
 package metrics
 
-import "sync"
+import (
+	"math"
+	"sync/atomic"
+)
 
 // GaugeFloat64s hold a float64 value that can be set arbitrarily.
 type GaugeFloat64 interface {
@@ -85,8 +88,7 @@ func (NilGaugeFloat64) Value() float64 { return 0.0 }
 // StandardGaugeFloat64 is the standard implementation of a GaugeFloat64 and uses
 // sync.Mutex to manage a single float64 value.
 type StandardGaugeFloat64 struct {
-	mutex sync.Mutex
-	value float64
+	value uint64
 }
 
 // Snapshot returns a read-only copy of the gauge.
@@ -96,16 +98,12 @@ func (g *StandardGaugeFloat64) Snapshot() GaugeFloat64 {
 
 // Update updates the gauge's value.
 func (g *StandardGaugeFloat64) Update(v float64) {
-	g.mutex.Lock()
-	defer g.mutex.Unlock()
-	g.value = v
+	atomic.StoreUint64(&g.value, math.Float64bits(v))
 }
 
 // Value returns the gauge's current value.
 func (g *StandardGaugeFloat64) Value() float64 {
-	g.mutex.Lock()
-	defer g.mutex.Unlock()
-	return g.value
+	return math.Float64frombits(atomic.LoadUint64(&g.value))
 }
 
 // FunctionalGaugeFloat64 returns value from given function

+ 10 - 0
gauge_float64_test.go

@@ -10,6 +10,16 @@ func BenchmarkGuageFloat64(b *testing.B) {
 	}
 }
 
+func BenchmarkGuageFloat64Parallel(b *testing.B) {
+	g := NewGaugeFloat64()
+	b.ResetTimer()
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			g.Update(float64(1))
+		}
+	})
+}
+
 func TestGaugeFloat64(t *testing.T) {
 	g := NewGaugeFloat64()
 	g.Update(float64(47.0))

+ 19 - 0
gauge_test.go

@@ -2,7 +2,10 @@ package metrics
 
 import (
 	"fmt"
+	"math/rand"
+	"sync"
 	"testing"
+	"time"
 )
 
 func BenchmarkGuage(b *testing.B) {
@@ -13,6 +16,22 @@ func BenchmarkGuage(b *testing.B) {
 	}
 }
 
+// exercise race detector
+func TestGaugeConcurrency(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	g := NewGauge()
+	wg := &sync.WaitGroup{}
+	reps := 100
+	for i := 0; i < reps; i++ {
+		wg.Add(1)
+		go func(g Gauge, wg *sync.WaitGroup) {
+			g.Update(rand.Int63())
+			wg.Done()
+		}(g, wg)
+	}
+	wg.Wait()
+}
+
 func TestGauge(t *testing.T) {
 	g := NewGauge()
 	g.Update(int64(47))

+ 38 - 45
meter.go

@@ -1,7 +1,9 @@
 package metrics
 
 import (
+	"math"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -62,7 +64,7 @@ func NewRegisteredMeter(name string, r Registry) Meter {
 // MeterSnapshot is a read-only copy of another Meter.
 type MeterSnapshot struct {
 	count                          int64
-	rate1, rate5, rate15, rateMean float64
+	rate1, rate5, rate15, rateMean uint64
 }
 
 // Count returns the count of events at the time the snapshot was taken.
@@ -75,19 +77,19 @@ func (*MeterSnapshot) Mark(n int64) {
 
 // Rate1 returns the one-minute moving average rate of events per second at the
 // time the snapshot was taken.
-func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
+func (m *MeterSnapshot) Rate1() float64 { return math.Float64frombits(m.rate1) }
 
 // Rate5 returns the five-minute moving average rate of events per second at
 // the time the snapshot was taken.
-func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
+func (m *MeterSnapshot) Rate5() float64 { return math.Float64frombits(m.rate5) }
 
 // Rate15 returns the fifteen-minute moving average rate of events per second
 // at the time the snapshot was taken.
-func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
+func (m *MeterSnapshot) Rate15() float64 { return math.Float64frombits(m.rate15) }
 
 // RateMean returns the meter's mean rate of events per second at the time the
 // snapshot was taken.
-func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
+func (m *MeterSnapshot) RateMean() float64 { return math.Float64frombits(m.rateMean) }
 
 // Snapshot returns the snapshot.
 func (m *MeterSnapshot) Snapshot() Meter { return m }
@@ -124,11 +126,12 @@ func (NilMeter) Stop() {}
 
 // StandardMeter is the standard implementation of a Meter.
 type StandardMeter struct {
-	lock        sync.RWMutex
+	// Only used on stop.
+	lock        sync.Mutex
 	snapshot    *MeterSnapshot
 	a1, a5, a15 EWMA
 	startTime   time.Time
-	stopped     bool
+	stopped     uint32
 }
 
 func newStandardMeter() *StandardMeter {
@@ -145,9 +148,9 @@ func newStandardMeter() *StandardMeter {
 func (m *StandardMeter) Stop() {
 	m.lock.Lock()
 	stopped := m.stopped
-	m.stopped = true
+	m.stopped = 1
 	m.lock.Unlock()
-	if !stopped {
+	if stopped != 1 {
 		arbiter.Lock()
 		delete(arbiter.meters, m)
 		arbiter.Unlock()
@@ -156,20 +159,17 @@ func (m *StandardMeter) Stop() {
 
 // Count returns the number of events recorded.
 func (m *StandardMeter) Count() int64 {
-	m.lock.RLock()
-	count := m.snapshot.count
-	m.lock.RUnlock()
-	return count
+	return atomic.LoadInt64(&m.snapshot.count)
 }
 
 // Mark records the occurance of n events.
 func (m *StandardMeter) Mark(n int64) {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	if m.stopped {
+	if atomic.LoadUint32(&m.stopped) == 1 {
 		return
 	}
-	m.snapshot.count += n
+
+	atomic.AddInt64(&m.snapshot.count, n)
+
 	m.a1.Update(n)
 	m.a5.Update(n)
 	m.a15.Update(n)
@@ -178,56 +178,49 @@ func (m *StandardMeter) Mark(n int64) {
 
 // Rate1 returns the one-minute moving average rate of events per second.
 func (m *StandardMeter) Rate1() float64 {
-	m.lock.RLock()
-	rate1 := m.snapshot.rate1
-	m.lock.RUnlock()
-	return rate1
+	return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate1))
 }
 
 // Rate5 returns the five-minute moving average rate of events per second.
 func (m *StandardMeter) Rate5() float64 {
-	m.lock.RLock()
-	rate5 := m.snapshot.rate5
-	m.lock.RUnlock()
-	return rate5
+	return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate5))
 }
 
 // Rate15 returns the fifteen-minute moving average rate of events per second.
 func (m *StandardMeter) Rate15() float64 {
-	m.lock.RLock()
-	rate15 := m.snapshot.rate15
-	m.lock.RUnlock()
-	return rate15
+	return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate15))
 }
 
 // RateMean returns the meter's mean rate of events per second.
 func (m *StandardMeter) RateMean() float64 {
-	m.lock.RLock()
-	rateMean := m.snapshot.rateMean
-	m.lock.RUnlock()
-	return rateMean
+	return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rateMean))
 }
 
 // Snapshot returns a read-only copy of the meter.
 func (m *StandardMeter) Snapshot() Meter {
-	m.lock.RLock()
-	snapshot := *m.snapshot
-	m.lock.RUnlock()
-	return &snapshot
+	copiedSnapshot := MeterSnapshot{
+		count:    atomic.LoadInt64(&m.snapshot.count),
+		rate1:    atomic.LoadUint64(&m.snapshot.rate1),
+		rate5:    atomic.LoadUint64(&m.snapshot.rate5),
+		rate15:   atomic.LoadUint64(&m.snapshot.rate15),
+		rateMean: atomic.LoadUint64(&m.snapshot.rateMean),
+	}
+	return &copiedSnapshot
 }
 
 func (m *StandardMeter) updateSnapshot() {
-	// should run with write lock held on m.lock
-	snapshot := m.snapshot
-	snapshot.rate1 = m.a1.Rate()
-	snapshot.rate5 = m.a5.Rate()
-	snapshot.rate15 = m.a15.Rate()
-	snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
+	rate1 := math.Float64bits(m.a1.Rate())
+	rate5 := math.Float64bits(m.a5.Rate())
+	rate15 := math.Float64bits(m.a15.Rate())
+	rateMean := math.Float64bits(float64(m.Count()) / time.Since(m.startTime).Seconds())
+
+	atomic.StoreUint64(&m.snapshot.rate1, rate1)
+	atomic.StoreUint64(&m.snapshot.rate5, rate5)
+	atomic.StoreUint64(&m.snapshot.rate15, rate15)
+	atomic.StoreUint64(&m.snapshot.rateMean, rateMean)
 }
 
 func (m *StandardMeter) tick() {
-	m.lock.Lock()
-	defer m.lock.Unlock()
 	m.a1.Tick()
 	m.a5.Tick()
 	m.a15.Tick()

+ 34 - 0
meter_test.go

@@ -1,6 +1,8 @@
 package metrics
 
 import (
+	"math/rand"
+	"sync"
 	"testing"
 	"time"
 )
@@ -13,6 +15,38 @@ func BenchmarkMeter(b *testing.B) {
 	}
 }
 
+func BenchmarkMeterParallel(b *testing.B) {
+	m := NewMeter()
+	b.ResetTimer()
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			m.Mark(1)
+		}
+	})
+}
+
+// exercise race detector
+func TestMeterConcurrency(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	ma := meterArbiter{
+		ticker: time.NewTicker(time.Millisecond),
+		meters: make(map[*StandardMeter]struct{}),
+	}
+	m := newStandardMeter()
+	ma.meters[m] = struct{}{}
+	go ma.tick()
+	wg := &sync.WaitGroup{}
+	reps := 100
+	for i := 0; i < reps; i++ {
+		wg.Add(1)
+		go func(m Meter, wg *sync.WaitGroup) {
+			m.Mark(1)
+			wg.Done()
+		}(m, wg)
+	}
+	wg.Wait()
+}
+
 func TestGetOrRegisterMeter(t *testing.T) {
 	r := NewRegistry()
 	NewRegisteredMeter("foo", r).Mark(47)