Forráskód Böngészése

Add wait statistics to pools

Add WaitCount and WaitDuration stats to pool stats to enable tracking of the delays introduced by waiting for connections from the pool.

This uses the same properties as database/sql so its familiar to users.

Addresses: #403
Steven Hartland 6 éve
szülő
commit
42f3569497
2 módosított fájl, 100 hozzáadás és 13 törlés
  1. 36 7
      redis/pool.go
  2. 64 6
      redis/pool_test.go

+ 36 - 7
redis/pool.go

@@ -154,11 +154,13 @@ type Pool struct {
 
 	chInitialized uint32 // set to 1 when field ch is initialized
 
-	mu     sync.Mutex    // mu protects the following fields
-	closed bool          // set to true when the pool is closed.
-	active int           // the number of open connections in the pool
-	ch     chan struct{} // limits open connections when p.Wait is true
-	idle   idleList      // idle connections
+	mu           sync.Mutex    // mu protects the following fields
+	closed       bool          // set to true when the pool is closed.
+	active       int           // the number of open connections in the pool
+	ch           chan struct{} // limits open connections when p.Wait is true
+	idle         idleList      // idle connections
+	waitCount    int64         // total number of connections waited for.
+	waitDuration time.Duration // total time waited for new connections.
 }
 
 // NewPool creates a new pool.
@@ -188,14 +190,24 @@ type PoolStats struct {
 	ActiveCount int
 	// IdleCount is the number of idle connections in the pool.
 	IdleCount int
+
+	// WaitCount is the total number of connections waited for.
+	// This value is currently not guaranteed to be 100% accurate.
+	WaitCount int64
+
+	// WaitDuration is the total time blocked waiting for a new connection.
+	// This value is currently not guaranteed to be 100% accurate.
+	WaitDuration time.Duration
 }
 
 // Stats returns pool's statistics.
 func (p *Pool) Stats() PoolStats {
 	p.mu.Lock()
 	stats := PoolStats{
-		ActiveCount: p.active,
-		IdleCount:   p.idle.count,
+		ActiveCount:  p.active,
+		IdleCount:    p.idle.count,
+		WaitCount:    p.waitCount,
+		WaitDuration: p.waitDuration,
 	}
 	p.mu.Unlock()
 
@@ -270,8 +282,17 @@ func (p *Pool) get(ctx interface {
 }) (*poolConn, error) {
 
 	// Handle limit for p.Wait == true.
+	var waited time.Duration
 	if p.Wait && p.MaxActive > 0 {
 		p.lazyInit()
+
+		// wait indicates if we believe it will block so its not 100% accurate
+		// however for stats it should be good enough.
+		wait := len(p.ch) == 0
+		var start time.Time
+		if wait {
+			start = time.Now()
+		}
 		if ctx == nil {
 			<-p.ch
 		} else {
@@ -281,10 +302,18 @@ func (p *Pool) get(ctx interface {
 				return nil, ctx.Err()
 			}
 		}
+		if wait {
+			waited = time.Since(start)
+		}
 	}
 
 	p.mu.Lock()
 
+	if waited > 0 {
+		p.waitCount++
+		p.waitDuration += waited
+	}
+
 	// Prune stale connections at the back of the idle list.
 	if p.IdleTimeout > 0 {
 		n := p.idle.count

+ 64 - 6
redis/pool_test.go

@@ -25,6 +25,10 @@ import (
 	"github.com/gomodule/redigo/redis"
 )
 
+const (
+	testGoRoutines = 10
+)
+
 type poolTestConn struct {
 	d   *poolDialer
 	err error
@@ -84,7 +88,13 @@ func (d *poolDialer) dial() (redis.Conn, error) {
 }
 
 func (d *poolDialer) check(message string, p *redis.Pool, dialed, open, inuse int) {
+	d.checkAll(message, p, dialed, open, inuse, 0, 0)
+}
+
+func (d *poolDialer) checkAll(message string, p *redis.Pool, dialed, open, inuse int, waitCountMax int64, waitDurationMax time.Duration) {
 	d.mu.Lock()
+	defer d.mu.Unlock()
+
 	if d.dialed != dialed {
 		d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed)
 	}
@@ -101,7 +111,20 @@ func (d *poolDialer) check(message string, p *redis.Pool, dialed, open, inuse in
 		d.t.Errorf("%s: idle=%d, want %d", message, stats.IdleCount, open-inuse)
 	}
 
-	d.mu.Unlock()
+	if stats.WaitCount > waitCountMax {
+		d.t.Errorf("%s: unexpected wait=%d want at most %d", message, stats.WaitCount, waitCountMax)
+	}
+
+	if waitCountMax == 0 {
+		if stats.WaitDuration != 0 {
+			d.t.Errorf("%s: unexpected waitDuration=%v want %v", message, stats.WaitDuration, 0)
+		}
+		return
+	}
+
+	if stats.WaitDuration > waitDurationMax {
+		d.t.Errorf("%s: unexpected waitDuration=%v want < %v", message, stats.WaitDuration, waitDurationMax)
+	}
 }
 
 func TestPoolReuse(t *testing.T) {
@@ -375,6 +398,37 @@ func TestPoolMaxActive(t *testing.T) {
 	d.check("4", p, 2, 2, 1)
 }
 
+func TestPoolWaitStats(t *testing.T) {
+	d := poolDialer{t: t}
+	p := &redis.Pool{
+		Wait:      true,
+		MaxIdle:   2,
+		MaxActive: 2,
+		Dial:      d.dial,
+	}
+	defer p.Close()
+
+	c1 := p.Get()
+	c1.Do("PING")
+	c2 := p.Get()
+	c2.Do("PING")
+
+	d.checkAll("1", p, 2, 2, 2, 0, 0)
+
+	start := time.Now()
+	go func() {
+		time.Sleep(time.Millisecond * 100)
+		c1.Close()
+	}()
+
+	c3 := p.Get()
+	d.checkAll("2", p, 2, 2, 2, 1, time.Since(start))
+
+	if _, err := c3.Do("PING"); err != nil {
+		t.Errorf("expected good channel, err=%v", err)
+	}
+}
+
 func TestPoolMonitorCleanup(t *testing.T) {
 	d := poolDialer{t: t}
 	p := &redis.Pool{
@@ -493,7 +547,7 @@ func TestPoolTransactionCleanup(t *testing.T) {
 }
 
 func startGoroutines(p *redis.Pool, cmd string, args ...interface{}) chan error {
-	errs := make(chan error, 10)
+	errs := make(chan error, testGoRoutines)
 	for i := 0; i < cap(errs); i++ {
 		go func() {
 			c := p.Get()
@@ -517,6 +571,7 @@ func TestWaitPool(t *testing.T) {
 	defer p.Close()
 
 	c := p.Get()
+	start := time.Now()
 	errs := startGoroutines(p, "PING")
 	d.check("before close", p, 1, 1, 1)
 	c.Close()
@@ -531,7 +586,7 @@ func TestWaitPool(t *testing.T) {
 			t.Fatalf("timeout waiting for blocked goroutine %d", i)
 		}
 	}
-	d.check("done", p, 1, 1, 0)
+	d.checkAll("done", p, 1, 1, 0, testGoRoutines, time.Since(start)*testGoRoutines)
 }
 
 func TestWaitPoolClose(t *testing.T) {
@@ -548,6 +603,7 @@ func TestWaitPoolClose(t *testing.T) {
 	if _, err := c.Do("PING"); err != nil {
 		t.Fatal(err)
 	}
+	start := time.Now()
 	errs := startGoroutines(p, "PING")
 	d.check("before close", p, 1, 1, 1)
 	p.Close()
@@ -566,7 +622,7 @@ func TestWaitPoolClose(t *testing.T) {
 		}
 	}
 	c.Close()
-	d.check("done", p, 1, 0, 0)
+	d.checkAll("done", p, 1, 0, 0, testGoRoutines, time.Since(start)*testGoRoutines)
 }
 
 func TestWaitPoolCommandError(t *testing.T) {
@@ -581,6 +637,7 @@ func TestWaitPoolCommandError(t *testing.T) {
 	defer p.Close()
 
 	c := p.Get()
+	start := time.Now()
 	errs := startGoroutines(p, "ERR", testErr)
 	d.check("before close", p, 1, 1, 1)
 	c.Close()
@@ -595,7 +652,7 @@ func TestWaitPoolCommandError(t *testing.T) {
 			t.Fatalf("timeout waiting for blocked goroutine %d", i)
 		}
 	}
-	d.check("done", p, cap(errs), 0, 0)
+	d.checkAll("done", p, cap(errs), 0, 0, testGoRoutines, time.Since(start)*testGoRoutines)
 }
 
 func TestWaitPoolDialError(t *testing.T) {
@@ -610,6 +667,7 @@ func TestWaitPoolDialError(t *testing.T) {
 	defer p.Close()
 
 	c := p.Get()
+	start := time.Now()
 	errs := startGoroutines(p, "ERR", testErr)
 	d.check("before close", p, 1, 1, 1)
 
@@ -640,7 +698,7 @@ func TestWaitPoolDialError(t *testing.T) {
 	if errCount != cap(errs)-1 {
 		t.Errorf("expected %d dial errors, got %d", cap(errs)-1, errCount)
 	}
-	d.check("done", p, cap(errs), 0, 0)
+	d.checkAll("done", p, cap(errs), 0, 0, testGoRoutines, time.Since(start)*testGoRoutines)
 }
 
 // Borrowing requires us to iterate over the idle connections, unlock the pool,