소스 검색

Add max active connection limit to Pool.

Gary Burd 12 년 전
부모
커밋
19275c5e42
4개의 변경된 파일139개의 추가작업 그리고 54개의 파일을 삭제
  1. 10 0
      README.markdown
  2. 1 1
      redis/conn.go
  3. 47 17
      redis/pool.go
  4. 81 36
      redis/pool_test.go

+ 10 - 0
README.markdown

@@ -29,6 +29,16 @@ Install Redigo using the "go get" command:
 
 The Go distribution is Redigo's only dependency.
 
+Contributing
+------------
+
+Contributions are welcome. 
+
+Before writing code, send mail to gary@beagledreams.com to discuss what you
+plan to do. This gives me a chance to validate the design, avoid duplication of
+effort and ensure that the changes fit the goals of the project. Do not start
+the discussion with a pull request. 
+
 License
 -------
 

+ 1 - 1
redis/conn.go

@@ -47,7 +47,7 @@ type conn struct {
 	// '*' or '$', length, "\r\n"
 	lenScratch [1 + 19 + 2]byte
 
-	// Scratch space for formatting integers.
+	// Scratch space for formatting integers and floats.
 	numScratch [40]byte
 }
 

+ 47 - 17
redis/pool.go

@@ -23,6 +23,7 @@ import (
 
 var nowFunc = time.Now // for testing
 
+var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
 var errPoolClosed = errors.New("redigo: connection pool closed")
 
 // Pool maintains a pool of connections. The application calls the Get method
@@ -81,6 +82,10 @@ type Pool struct {
 	// Maximum number of idle connections in the pool.
 	MaxIdle int
 
+	// Maximum number of connections allocated by the pool at a given time.
+	// When zero, there is no limit on the number of connections in the pool.
+	MaxActive int
+
 	// Close connections after remaining idle for this duration. If the value
 	// is zero, then idle connections are not closed. Applications should set
 	// the timeout to a value less than the server's timeout.
@@ -89,6 +94,7 @@ type Pool struct {
 	// mu protects fields defined below.
 	mu     sync.Mutex
 	closed bool
+	active int
 
 	// Stack of idleConn with most recently used at the front.
 	idle list.List
@@ -110,12 +116,21 @@ func (p *Pool) Get() Conn {
 	return &pooledConnection{p: p}
 }
 
+// ActiveCount returns the number of active connections in the pool.
+func (p *Pool) ActiveCount() int {
+	p.mu.Lock()
+	active := p.active
+	p.mu.Unlock()
+	return active
+}
+
 // Close releases the resources used by the pool.
 func (p *Pool) Close() error {
 	p.mu.Lock()
 	idle := p.idle
 	p.idle.Init()
 	p.closed = true
+	p.active -= idle.Len()
 	p.mu.Unlock()
 	for e := idle.Front(); e != nil; e = e.Next() {
 		e.Value.(idleConn).c.Close()
@@ -146,6 +161,7 @@ func (p *Pool) get() (Conn, error) {
 				break
 			}
 			p.idle.Remove(e)
+			p.active -= 1
 			p.mu.Unlock()
 			ic.c.Close()
 			p.mu.Lock()
@@ -163,33 +179,51 @@ func (p *Pool) get() (Conn, error) {
 		p.idle.Remove(e)
 		test := p.TestOnBorrow
 		p.mu.Unlock()
-		if test != nil && test(ic.c, ic.t) != nil {
-			ic.c.Close()
-		} else {
+		if test == nil || test(ic.c, ic.t) == nil {
 			return ic.c, nil
 		}
+		ic.c.Close()
 		p.mu.Lock()
+		p.active -= 1
+	}
+
+	if p.MaxActive > 0 && p.active >= p.MaxActive {
+		p.mu.Unlock()
+		return nil, ErrPoolExhausted
 	}
 
 	// No idle connection, create new.
 
 	dial := p.Dial
+	p.active += 1
 	p.mu.Unlock()
-	return dial()
+	c, err := dial()
+	if err != nil {
+		p.mu.Lock()
+		p.active -= 1
+		p.mu.Unlock()
+		c = nil
+	}
+	return c, err
 }
 
 func (p *Pool) put(c Conn) error {
-	p.mu.Lock()
-	if !p.closed {
-		p.idle.PushFront(idleConn{t: nowFunc(), c: c})
-		if p.idle.Len() > p.MaxIdle {
-			c = p.idle.Remove(p.idle.Back()).(idleConn).c
-		} else {
-			c = nil
+	if c.Err() == nil {
+		p.mu.Lock()
+		if !p.closed {
+			p.idle.PushFront(idleConn{t: nowFunc(), c: c})
+			if p.idle.Len() > p.MaxIdle {
+				c = p.idle.Remove(p.idle.Back()).(idleConn).c
+			} else {
+				c = nil
+			}
 		}
+		p.mu.Unlock()
 	}
-	p.mu.Unlock()
 	if c != nil {
+		p.mu.Lock()
+		p.active -= 1
+		p.mu.Unlock()
 		return c.Close()
 	}
 	return nil
@@ -211,11 +245,7 @@ func (c *pooledConnection) get() error {
 func (c *pooledConnection) Close() (err error) {
 	if c.c != nil {
 		c.c.Do("")
-		if c.c.Err() != nil {
-			err = c.c.Close()
-		} else {
-			err = c.p.put(c.c)
-		}
+		c.p.put(c.c)
 		c.c = nil
 		c.err = errPoolClosed
 	}

+ 81 - 36
redis/pool_test.go

@@ -47,11 +47,34 @@ func (c *fakeConn) Receive() (reply interface{}, err error) {
 	return nil, nil
 }
 
+type dialer struct {
+	t            *testing.T
+	dialed, open int
+}
+
+func (d *dialer) dial() (Conn, error) {
+	d.open += 1
+	d.dialed += 1
+	return &fakeConn{open: &d.open}, nil
+}
+
+func (d *dialer) check(message string, p *Pool, dialed, open int) {
+	if d.dialed != dialed {
+		d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed)
+	}
+	if d.open != open {
+		d.t.Errorf("%s: open=%d, want %d", message, d.open, open)
+	}
+	if active := p.ActiveCount(); active != open {
+		d.t.Errorf("%s: active=%d, want %d", message, active, open)
+	}
+}
+
 func TestPoolReuse(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle: 2,
-		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:    d.dial,
 	}
 
 	for i := 0; i < 10; i++ {
@@ -62,18 +85,18 @@ func TestPoolReuse(t *testing.T) {
 		c1.Close()
 		c2.Close()
 	}
-	if open != 2 || dialed != 2 {
-		t.Errorf("want open=2, got %d; want dialed=2, got %d", open, dialed)
-	}
+
+	d.check("before close", p, 2, 2)
+	p.Close()
+	d.check("after close", p, 2, 0)
 }
 
 func TestPoolMaxIdle(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle: 2,
-		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:    d.dial,
 	}
-
 	for i := 0; i < 10; i++ {
 		c1 := p.Get()
 		c1.Do("PING")
@@ -85,16 +108,16 @@ func TestPoolMaxIdle(t *testing.T) {
 		c2.Close()
 		c3.Close()
 	}
-	if open != 2 || dialed != 12 {
-		t.Errorf("want open=2, got %d; want dialed=12, got %d", open, dialed)
-	}
+	d.check("before close", p, 12, 2)
+	p.Close()
+	d.check("after close", p, 12, 0)
 }
 
 func TestPoolError(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle: 2,
-		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:    d.dial,
 	}
 
 	c := p.Get()
@@ -108,16 +131,14 @@ func TestPoolError(t *testing.T) {
 	c.Do("ERR", io.EOF)
 	c.Close()
 
-	if open != 0 || dialed != 2 {
-		t.Errorf("want open=0, got %d; want dialed=2, got %d", open, dialed)
-	}
+	d.check(".", p, 2, 0)
 }
 
 func TestPoolClose(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle: 2,
-		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:    d.dial,
 	}
 
 	c1 := p.Get()
@@ -136,18 +157,15 @@ func TestPoolClose(t *testing.T) {
 
 	p.Close()
 
-	if open != 1 {
-		t.Errorf("want open=1, got %d", open)
-	}
+	d.check("after pool close", p, 3, 1)
 
 	if _, err := c1.Do("PING"); err == nil {
 		t.Errorf("expected error after connection and pool closed")
 	}
 
 	c3.Close()
-	if open != 0 {
-		t.Errorf("want open=0, got %d", open)
-	}
+
+	d.check("after channel close", p, 3, 0)
 
 	c1 = p.Get()
 	if _, err := c1.Do("PING"); err == nil {
@@ -156,11 +174,11 @@ func TestPoolClose(t *testing.T) {
 }
 
 func TestPoolTimeout(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle:     2,
 		IdleTimeout: 300 * time.Second,
-		Dial:        func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:        d.dial,
 	}
 
 	now := time.Now()
@@ -171,9 +189,7 @@ func TestPoolTimeout(t *testing.T) {
 	c.Do("PING")
 	c.Close()
 
-	if open != 1 || dialed != 1 {
-		t.Errorf("want open=1, got %d; want dialed=1, got %d", open, dialed)
-	}
+	d.check("1", p, 1, 1)
 
 	now = now.Add(p.IdleTimeout)
 
@@ -181,16 +197,14 @@ func TestPoolTimeout(t *testing.T) {
 	c.Do("PING")
 	c.Close()
 
-	if open != 1 || dialed != 2 {
-		t.Errorf("want open=1, got %d; want dialed=2, got %d", open, dialed)
-	}
+	d.check("2", p, 2, 1)
 }
 
 func TestBorrowCheck(t *testing.T) {
-	var open, dialed int
+	d := dialer{t: t}
 	p := &Pool{
 		MaxIdle:      2,
-		Dial:         func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+		Dial:         d.dial,
 		TestOnBorrow: func(Conn, time.Time) error { return Error("BLAH") },
 	}
 
@@ -199,7 +213,38 @@ func TestBorrowCheck(t *testing.T) {
 		c.Do("PING")
 		c.Close()
 	}
-	if open != 1 || dialed != 10 {
-		t.Errorf("want open=1, got %d; want dialed=10, got %d", open, dialed)
+	d.check("1", p, 10, 1)
+}
+
+func TestMaxActive(t *testing.T) {
+	d := dialer{t: t}
+	p := &Pool{
+		MaxIdle:   2,
+		MaxActive: 2,
+		Dial:      d.dial,
 	}
+	c1 := p.Get()
+	c1.Do("PING")
+	c2 := p.Get()
+	c2.Do("PING")
+
+	d.check("1", p, 2, 2)
+
+	c3 := p.Get()
+	if _, err := c3.Do("PING"); err != ErrPoolExhausted {
+		t.Errorf("expected pool exhausted")
+	}
+
+	c3.Close()
+	d.check("2", p, 2, 2)
+	c2.Close()
+	d.check("2", p, 2, 2)
+
+	c3 = p.Get()
+	if _, err := c3.Do("PING"); err != nil {
+		t.Errorf("expected good channel, err=%v", err)
+	}
+	c3.Close()
+
+	d.check("2", p, 2, 2)
 }