Explorar o código

Improve connection pool.

- Removed error return value from Pool.Get method. Any errors creating
  the underlying connection  are returned on the first use of the pooled
  connection. The application can call the connection's Err method if
  the application needs to know if there's a problem with the connection
  before using it.
- Added exported fields Dial, MaxIdle and IdleTimeout to Pool. The first
  two fields correspond the arguments of the now deprecated NewPool
  function.  IdleTimeout specifies how long a connection can remain idle
  in the pool before being closed.
- Added Pool Close method.
- Bonus: Add copyright notice to redisx example. Fix typo in connection
  test.
Gary Burd %!s(int64=13) %!d(string=hai) anos
pai
achega
b9d64bd42f
Modificáronse 4 ficheiros con 290 adicións e 97 borrados
  1. 1 1
      redis/conn_test.go
  2. 146 67
      redis/pool.go
  3. 129 29
      redis/pool_test.go
  4. 14 0
      redisx/example.go

+ 1 - 1
redis/conn_test.go

@@ -334,7 +334,7 @@ func TestReadDeadline(t *testing.T) {
 
 	_, err = c1.Do("PING")
 	if err == nil {
-		t.Fatalf("Dodid not return error.")
+		t.Fatalf("Do did not return error.")
 	}
 
 	c2, err := redis.DialTimeout(l.Addr().Network(), l.Addr().String(), 0, time.Millisecond, 0)

+ 146 - 67
redis/pool.go

@@ -15,12 +15,19 @@
 package redis
 
 import (
+	"container/list"
 	"errors"
+	"sync"
+	"time"
 )
 
-// Pool maintains a pool of connections.
-//
-// Pooled connections do not support concurrent access or pub/sub.
+var nowFunc = time.Now // for testing
+
+var errPoolClosed = errors.New("redigo: connection pool closed")
+
+// Pool maintains a pool of connections. The application calls the Get method
+// to get a connection from the pool and the connection's Close method to
+// return the connection's resources to the pool.
 //
 // The following example shows how to use a pool in a web application. The
 // application creates a pool at application startup and makes it available to
@@ -30,13 +37,18 @@ import (
 //      var password string
 //      ...
 //
-//      pool = redis.NewPool(func () (c redis.Conn, err error) {
-//              c, err = redis.Dial(server)
-//              if err != nil {
-//                  err = c.Do("AUTH", password)
-//              }
-//              return
-//          }, 3)
+//      pool = &redis.Pool{
+//              MaxIdle: 3,
+//              Dial: func () (c redis.Conn err error) {
+//                  c, err = redis.Dial("tcp", server)
+//                  if err != nil {
+//                      return nil, err
+//                  }
+//                  if err = c.Do("AUTH", password); err != nil {
+//                      return nil, err    
+//                  }
+//              },
+//          }
 //
 // This pool has a maximum of three connections to the server specified by the
 // variable "server". Each connection is authenticated using a password.
@@ -45,98 +57,165 @@ import (
 // when the handler is done:
 //
 //  conn, err := pool.Get()
-//  if err != nil {
-//      // handle the error
-//  }
 //  defer conn.Close()
 //  // do something with the connection
-//
-// Close() returns the connection to the pool if there's room in the pool and
-// the connection does not have a permanent error. Otherwise, Close() releases
-// the resources used by the connection.
 type Pool struct {
-	newFn func() (Conn, error)
-	conns chan Conn
+
+	// Dial is an application supplied function for creating new connections.
+	Dial func() (Conn, error)
+
+	// Maximum number of idle connections in the pool.
+	MaxIdle int
+
+	// Close connections after remaining idle for this duration. If the value
+	// is zero, then idle connections are not closed. Applications should set
+	// the timeout to a value less than the server's timeout.
+	IdleTimeout time.Duration
+
+	// mu protects fields defined below.
+	mu     sync.Mutex
+	closed bool
+
+	// Stack of idleConn with most recently used at the front.
+	idle list.List
 }
 
-type pooledConnection struct {
-	c    Conn
-	err  error
-	pool *Pool
+type idleConn struct {
+	c Conn
+	t time.Time
 }
 
-// NewPool returns a new connection pool. The pool uses newFn to create
-// connections as needed and maintains a maximum of maxIdle idle connections.
+// NewPool returns a pool that uses newPool to create connections as needed.
+// The pool keeps a maximum of maxIdle idle connections. 
 func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
-	return &Pool{newFn: newFn, conns: make(chan Conn, maxIdle)}
+	return &Pool{Dial: newFn, MaxIdle: maxIdle}
+}
+
+// Get gets a connection from the pool.
+func (p *Pool) Get() Conn {
+	return &pooledConnection{p: p}
 }
 
-// Get returns an idle connection from the pool if available or creates a new
-// connection. The caller should Close() the connection to return the
-// connection to the pool.
-func (p *Pool) Get() (Conn, error) {
-	var c Conn
-	select {
-	case c = <-p.conns:
-	default:
-		var err error
-		c, err = p.newFn()
-		if err != nil {
-			return nil, err
+// Close releases the resources used by the pool.
+func (p *Pool) Close() error {
+	p.mu.Lock()
+	idle := p.idle
+	p.idle.Init()
+	p.closed = true
+	p.mu.Unlock()
+	for e := idle.Front(); e != nil; e = e.Next() {
+		e.Value.(idleConn).c.Close()
+	}
+	return nil
+}
+
+func (p *Pool) get() (c Conn, err error) {
+	p.mu.Lock()
+	if p.IdleTimeout > 0 {
+		for {
+			e := p.idle.Back()
+			if e == nil || nowFunc().Before(e.Value.(idleConn).t) {
+				break
+			}
+			cStale := e.Value.(idleConn).c
+			p.idle.Remove(e)
+
+			// Release the pool lock during the potentially long call to the
+			// connecton's Close method.
+			p.mu.Unlock()
+			cStale.Close()
+			p.mu.Lock()
 		}
 	}
-	return &pooledConnection{c: c, pool: p}, nil
+	if p.closed {
+		p.mu.Unlock()
+		return nil, errors.New("redigo: get on closed pool")
+	}
+	if e := p.idle.Front(); e != nil {
+		c = e.Value.(idleConn).c
+		p.idle.Remove(e)
+	}
+	p.mu.Unlock()
+	if c == nil {
+		c, err = p.Dial()
+	}
+	return c, err
+}
+
+func (p *Pool) put(c Conn) error {
+	p.mu.Lock()
+	if !p.closed {
+		p.idle.PushFront(idleConn{t: nowFunc().Add(p.IdleTimeout), c: c})
+		if p.idle.Len() > p.MaxIdle {
+			c = p.idle.Remove(p.idle.Back()).(idleConn).c
+		} else {
+			c = nil
+		}
+	}
+	p.mu.Unlock()
+	if c != nil {
+		return c.Close()
+	}
+	return nil
+}
+
+type pooledConnection struct {
+	c   Conn
+	err error
+	p   *Pool
+}
+
+func (c *pooledConnection) get() error {
+	if c.err == nil && c.c == nil {
+		c.c, c.err = c.p.get()
+	}
+	return c.err
+}
+
+func (c *pooledConnection) Close() (err error) {
+	if c.c != nil {
+		if c.c.Err() != nil {
+			err = c.c.Close()
+		} else {
+			err = c.p.put(c.c)
+		}
+		c.c = nil
+		c.err = errPoolClosed
+	}
+	return err
 }
 
 func (c *pooledConnection) Err() error {
-	if c.err != nil {
-		return c.err
+	if err := c.get(); err != nil {
+		return err
 	}
 	return c.c.Err()
 }
 
 func (c *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
-	if c.err != nil {
-		return nil, c.err
+	if err := c.get(); err != nil {
+		return nil, err
 	}
 	return c.c.Do(commandName, args...)
 }
 
 func (c *pooledConnection) Send(commandName string, args ...interface{}) error {
-	if c.err != nil {
-		return c.err
+	if err := c.get(); err != nil {
+		return err
 	}
 	return c.c.Send(commandName, args...)
 }
 
 func (c *pooledConnection) Flush() error {
-	if c.err != nil {
-		return c.err
+	if err := c.get(); err != nil {
+		return err
 	}
 	return c.c.Flush()
 }
 
 func (c *pooledConnection) Receive() (reply interface{}, err error) {
-	if c.err != nil {
-		return nil, c.err
+	if err := c.get(); err != nil {
+		return nil, err
 	}
 	return c.c.Receive()
 }
-
-var errPoolClosed = errors.New("redigo: pooled connection closed")
-
-func (c *pooledConnection) Close() (err error) {
-	if c.err != nil {
-		return c.err
-	}
-	c.err = errPoolClosed
-	if c.c.Err() != nil {
-		return c.c.Close()
-	}
-	select {
-	case c.pool.conns <- c.c:
-	default:
-		return c.c.Close()
-	}
-	return nil
-}

+ 129 - 29
redis/pool_test.go

@@ -17,17 +17,21 @@ package redis
 import (
 	"io"
 	"testing"
+	"time"
 )
 
 type fakeConn struct {
-	closed bool
-	err    error
+	open *int
+	err  error
 }
 
-func (c *fakeConn) Close() error { c.closed = true; return nil }
+func (c *fakeConn) Close() error { *c.open -= 1; return nil }
 func (c *fakeConn) Err() error   { return c.err }
 
 func (c *fakeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
+	if commandName == "ERR" {
+		c.err = args[0].(error)
+	}
 	return nil, nil
 }
 
@@ -43,45 +47,141 @@ func (c *fakeConn) Receive() (reply interface{}, err error) {
 	return nil, nil
 }
 
-func TestPool(t *testing.T) {
-	var count int
-	p := NewPool(func() (Conn, error) { count += 1; return &fakeConn{}, nil }, 2)
+func TestPoolReuse(t *testing.T) {
+	var open, dialed int
+	p := &Pool{
+		MaxIdle: 2,
+		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+	}
 
-	count = 0
 	for i := 0; i < 10; i++ {
-		c1, _ := p.Get()
-		c2, _ := p.Get()
+		c1 := p.Get()
+		c1.Do("PING")
+		c2 := p.Get()
+		c2.Do("PING")
 		c1.Close()
 		c2.Close()
 	}
-	if count != 2 {
-		t.Fatal("expected count 1, actual", count)
+	if open != 2 || dialed != 2 {
+		t.Errorf("want open=2, got %d; want dialed=2, got %d", open, dialed)
 	}
+}
 
-	p.Get()
-	p.Get()
-	count = 0
-	for i := 0; i < 10; i++ {
-		c, _ := p.Get()
-		c.(*pooledConnection).c.(*fakeConn).err = io.EOF
-		c.Close()
-	}
-	if count != 10 {
-		t.Fatal("expected count 10, actual", count)
+func TestPoolMaxIdle(t *testing.T) {
+	var open, dialed int
+	p := &Pool{
+		MaxIdle: 2,
+		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
 	}
 
-	p.Get()
-	p.Get()
-	count = 0
 	for i := 0; i < 10; i++ {
-		c1, _ := p.Get()
-		c2, _ := p.Get()
-		c3, _ := p.Get()
+		c1 := p.Get()
+		c1.Do("PING")
+		c2 := p.Get()
+		c2.Do("PING")
+		c3 := p.Get()
+		c3.Do("PING")
 		c1.Close()
 		c2.Close()
 		c3.Close()
 	}
-	if count != 12 {
-		t.Fatal("expected count 12, actual", count)
+	if open != 2 || dialed != 12 {
+		t.Errorf("want open=2, got %d; want dialed=12, got %d", open, dialed)
+	}
+}
+
+func TestPoolError(t *testing.T) {
+	var open, dialed int
+	p := &Pool{
+		MaxIdle: 2,
+		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+	}
+
+	c := p.Get()
+	c.Do("ERR", io.EOF)
+	if c.Err() == nil {
+		t.Errorf("expected c.Err() != nil")
+	}
+	c.Close()
+
+	c = p.Get()
+	c.Do("ERR", io.EOF)
+	c.Close()
+
+	if open != 0 || dialed != 2 {
+		t.Errorf("want open=0, got %d; want dialed=2, got %d", open, dialed)
+	}
+}
+
+func TestPoolClose(t *testing.T) {
+	var open, dialed int
+	p := &Pool{
+		MaxIdle: 2,
+		Dial:    func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+	}
+
+	c1 := p.Get()
+	c1.Do("PING")
+	c2 := p.Get()
+	c2.Do("PING")
+	c3 := p.Get()
+	c3.Do("PING")
+
+	c1.Close()
+	if _, err := c1.Do("PING"); err == nil {
+		t.Errorf("expected error after connection closed")
+	}
+
+	c2.Close()
+
+	p.Close()
+
+	if open != 1 {
+		t.Errorf("want open=1, got %d", open)
+	}
+
+	if _, err := c1.Do("PING"); err == nil {
+		t.Errorf("expected error after connection and pool closed")
+	}
+
+	c3.Close()
+	if open != 0 {
+		t.Errorf("want open=0, got %d", open)
+	}
+
+	c1 = p.Get()
+	if _, err := c1.Do("PING"); err == nil {
+		t.Errorf("expected error after pool closed")
+	}
+}
+
+func TestPoolTimeout(t *testing.T) {
+	var open, dialed int
+	p := &Pool{
+		MaxIdle:     2,
+		IdleTimeout: 300 * time.Second,
+		Dial:        func() (Conn, error) { open += 1; dialed += 1; return &fakeConn{open: &open}, nil },
+	}
+
+	now := time.Now()
+	nowFunc = func() time.Time { return now }
+	defer func() { nowFunc = time.Now }()
+
+	c := p.Get()
+	c.Do("PING")
+	c.Close()
+
+	if open != 1 || dialed != 1 {
+		t.Errorf("want open=1, got %d; want dialed=1, got %d", open, dialed)
+	}
+
+	now = now.Add(p.IdleTimeout)
+
+	c = p.Get()
+	c.Do("PING")
+	c.Close()
+
+	if open != 1 || dialed != 2 {
+		t.Errorf("want open=1, got %d; want dialed=2, got %d", open, dialed)
 	}
 }

+ 14 - 0
redisx/example.go

@@ -1,3 +1,17 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
 // +build ignore 
 
 package main