瀏覽代碼

Add pool and logging connections.

Gary Burd 13 年之前
父節點
當前提交
56be2cf0df
共有 4 個文件被更改,包括 306 次插入56 次删除
  1. 2 2
      redis/conn.go
  2. 117 0
      redis/log.go
  3. 98 0
      redis/pool.go
  4. 89 54
      redis/reply.go

+ 2 - 2
redis/conn.go

@@ -40,7 +40,7 @@ type conn struct {
 func Dial(network, address string) (Conn, error) {
 	netConn, err := net.Dial(network, address)
 	if err != nil {
-		return nil, err
+		return nil, errors.New("Could not connect to Redis server: " + err.Error())
 	}
 	return NewConn(netConn), nil
 }
@@ -50,7 +50,7 @@ func Dial(network, address string) (Conn, error) {
 func DialTimeout(network, address string, timeout time.Duration) (Conn, error) {
 	netConn, err := net.DialTimeout(network, address, timeout)
 	if err != nil {
-		return nil, err
+		return nil, errors.New("Could not connect to Redis server: " + err.Error())
 	}
 	return NewConn(netConn), nil
 }

+ 117 - 0
redis/log.go

@@ -0,0 +1,117 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+)
+
+// NewLoggingConn returns a logging wrapper around a connection.
+func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
+	if prefix != "" {
+		prefix = prefix + "."
+	}
+	return &loggingConn{conn, logger, prefix}
+}
+
+type loggingConn struct {
+	Conn
+	logger *log.Logger
+	prefix string
+}
+
+func (c *loggingConn) Close() error {
+	err := c.Conn.Close()
+	var buf bytes.Buffer
+	fmt.Fprintf(&buf, "%sClose() -> (%v)", c.prefix, err)
+	c.logger.Output(2, buf.String())
+	return err
+}
+
+func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) {
+	const chop = 32
+	switch v := v.(type) {
+	case []byte:
+		if len(v) > chop {
+			fmt.Fprintf(buf, "%q...", v[:chop])
+		} else {
+			fmt.Fprintf(buf, "%q", v)
+		}
+	case string:
+		if len(v) > chop {
+			fmt.Fprintf(buf, "%q...", v[:chop])
+		} else {
+			fmt.Fprintf(buf, "%q", v)
+		}
+	case []interface{}:
+		if len(v) == 0 {
+			buf.WriteString("[]")
+		} else {
+			sep := "["
+			fin := "]"
+			if len(v) > chop {
+				v = v[:chop]
+				fin = "...]"
+			}
+			for _, vv := range v {
+				buf.WriteString(sep)
+				c.printValue(buf, vv)
+				sep = ", "
+			}
+			buf.WriteString(fin)
+		}
+	default:
+		fmt.Fprint(buf, v)
+	}
+}
+
+func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) {
+	var buf bytes.Buffer
+	fmt.Fprintf(&buf, "%s%s(", c.prefix, method)
+	if method != "Receive" {
+		buf.WriteString(commandName)
+		for _, arg := range args {
+			buf.WriteString(", ")
+			c.printValue(&buf, arg)
+		}
+	}
+	buf.WriteString(") -> (")
+	if method != "Send" {
+		c.printValue(&buf, reply)
+		buf.WriteString(", ")
+	}
+	fmt.Fprintf(&buf, "%v)", err)
+	c.logger.Output(3, buf.String())
+}
+
+func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) {
+	reply, err := c.Conn.Do(commandName, args...)
+	c.print("Do", commandName, args, reply, err)
+	return reply, err
+}
+
+func (c *loggingConn) Send(commandName string, args ...interface{}) error {
+	err := c.Conn.Send(commandName, args...)
+	c.print("Send", commandName, args, nil, err)
+	return err
+}
+
+func (c *loggingConn) Receive() (interface{}, error) {
+	reply, err := c.Conn.Receive()
+	c.print("Receive", "", nil, reply, err)
+	return reply, err
+}

+ 98 - 0
redis/pool.go

@@ -0,0 +1,98 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+// Pool maintains a pool of connections.
+//
+// The following example shows how to use a pool in a web application. The
+// application creates a pool at application startup and makes it available to
+// request handlers, possibly using a global variable:
+//
+//      var server string           // host:port of server
+//      var password string
+//      ...
+//
+//      pool = redis.NewPool(func () (c redis.Conn, err error) {
+//              c, err = redis.Dial(server)
+//              if err != nil {
+//                  err = c.Do("AUTH", password)
+//              }
+//              return
+//          }, 3)
+//
+// This pool has a maximum of three connections to the server specified by the
+// variable "server". Each connection is authenticated using a password.
+//
+// A request handler gets a connection from the pool and closes the connection
+// when the handler is done:
+//
+//  conn, err := pool.Get()
+//  if err != nil {
+//      // handle the error
+//  }
+//  defer conn.Close()
+//  // do something with the connection
+//
+// Close() returns the connection to the pool if there's room in the pool and
+// the connection does not have a permanent error. Otherwise, Close() releases
+// the resources used by the connection.
+type Pool struct {
+	newFn func() (Conn, error)
+	conns chan Conn
+}
+
+type pooledConnection struct {
+	Conn
+	pool *Pool
+}
+
+// NewPool returns a new connection pool. The pool uses newFn to create
+// connections as needed and maintains a maximum of maxIdle idle connections.
+func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
+	return &Pool{newFn: newFn, conns: make(chan Conn, maxIdle)}
+}
+
+// Get returns an idle connection from the pool if available or creates a new
+// connection. The caller should Close() the connection to return the
+// connection to the pool.
+func (p *Pool) Get() (Conn, error) {
+	var c Conn
+	select {
+	case c = <-p.conns:
+	default:
+		var err error
+		c, err = p.newFn()
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &pooledConnection{Conn: c, pool: p}, nil
+}
+
+func (c *pooledConnection) Close() error {
+	if c.Conn == nil {
+		return nil
+	}
+	if c.Err() != nil {
+		return nil
+	}
+	select {
+	case c.pool.conns <- c.Conn:
+	default:
+		c.Conn.Close()
+	}
+	c.Conn = nil
+	return nil
+}

+ 89 - 54
redis/reply.go

@@ -16,17 +16,41 @@ package redis
 
 import (
 	"errors"
+	"fmt"
 	"strconv"
 )
 
-var (
-	errUnexpectedReplyType = errors.New("redigo: unexpected reply type")
-)
+var ErrNil = errors.New("redigo: nil returned")
 
-// Int is a helper that converts a Redis reply to an int. Integer replies are
-// returned directly. Bulk replies are interpreted as signed decimal strings.
-// If err is not equal to nil or the reply is not an integer or bulk value,
-// then Int returns an error.
+func Values(multiBulk []interface{}, values ...interface{}) ([]interface{}, error) {
+	if len(multiBulk) < len(values) {
+		return nil, errors.New("redigo Values: short multibulk")
+	}
+	var err error
+	for i, value := range values {
+		bulk := multiBulk[i]
+		if bulk != nil {
+			switch value := value.(type) {
+			case *string:
+				*value, err = String(bulk, nil)
+			case *int:
+				*value, err = Int(bulk, nil)
+			case *bool:
+				*value, err = Bool(bulk, nil)
+			case *[]byte:
+				*value, err = Bytes(bulk, nil)
+			default:
+				panic("Value type not supported")
+			}
+			if err != nil {
+				break
+			}
+		}
+	}
+	return multiBulk[len(values):], err
+}
+
+// Int is a helper that converts a Redis reply to an int.
 func Int(v interface{}, err error) (int, error) {
 	if err != nil {
 		return 0, err
@@ -37,70 +61,91 @@ func Int(v interface{}, err error) (int, error) {
 	case []byte:
 		n, err := strconv.ParseInt(string(v), 10, 0)
 		return int(n), err
+	case nil:
+		return 0, ErrNil
 	case Error:
 		return 0, v
 	}
-	return 0, errUnexpectedReplyType
+	return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", v)
 }
 
-// String is a helper that converts a Redis reply to a string. Bulk replies are
-// returned as a string. Integer replies are formatted as as a signed decimal
-// string. If err is not equal to nil or the reply is not an integer or bulk
-// value, then Int returns an error.
+// String is a helper that converts a Redis reply to a string. 
 func String(v interface{}, err error) (string, error) {
 	if err != nil {
 		return "", err
 	}
 	switch v := v.(type) {
-	case int64:
-		return strconv.FormatInt(v, 10), nil
 	case []byte:
 		return string(v), nil
+	case int64:
+		return strconv.FormatInt(v, 10), nil
+	case nil:
+		return "", ErrNil
 	case Error:
 		return "", v
 	}
-	return "", errUnexpectedReplyType
+	panic("FOOBAR")
+	return "", fmt.Errorf("redigo: unexpected type for String, got type %T", v)
 }
 
-// Bytes is a helper that converts a Redis reply to slice of bytes.  Bulk
-// replies are returned as is. Integer replies are formatted as as a signed
-// decimal string. If err is not equal to nil or the reply is not an integer
-// or bulk value, then Int returns an error.
+// Bytes is a helper that converts a Redis reply to slice of bytes. 
 func Bytes(v interface{}, err error) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
 	switch v := v.(type) {
-	case int64:
-		return strconv.AppendInt(nil, v, 10), nil
 	case []byte:
 		return v, nil
+	case int64:
+		return strconv.AppendInt(nil, v, 10), nil
+	case nil:
+		return nil, ErrNil
 	case Error:
 		return nil, v
 	}
-	return nil, errUnexpectedReplyType
+	return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", v)
 }
 
-// Bool is a helper that converts a Redis reply eo a bool. Bool returns true if
-// the reply is the integer 1 and false if the reply is the integer 0.  If err
-// is not equal to nil or the reply is not the integer 0 or 1, then Bool
-// returns an error.
+// Bool is a helper that converts a Redis reply to a bool. Bool converts the
+// integer 0 and the bulk values "0" and "" to false. All other integer and
+// bulk values are converted to true. If the reply is not an integer or bulk
+// value or err is not equal to nil, then Bool returns an error.
 func Bool(v interface{}, err error) (bool, error) {
 	if err != nil {
 		return false, err
 	}
 	switch v := v.(type) {
 	case int64:
-		switch v {
-		case 0:
+		return v != 0, nil
+	case []byte:
+		if len(v) == 0 || (len(v) == 1 && v[0] == '0') {
 			return false, nil
-		case 1:
-			return true, nil
 		}
+		return true, nil
+	case nil:
+		return false, ErrNil
 	case Error:
 		return false, v
 	}
-	return false, errUnexpectedReplyType
+	return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", v)
+}
+
+// MultiBulk is a helper that converts a Redis reply to a []interface{}. If err
+// is not equal to nil or the reply is not a multi-bulk reply, then MultiBulk
+// returns an error.
+func MultiBulk(v interface{}, err error) ([]interface{}, error) {
+	if err != nil {
+		return nil, err
+	}
+	switch v := v.(type) {
+	case []interface{}:
+		return v, nil
+	case nil:
+		return nil, ErrNil
+	case Error:
+		return nil, v
+	}
+	return nil, fmt.Errorf("redigo: unexpected type for MultiBulk, got type %T", v)
 }
 
 // Subscribe represents a subscribe or unsubscribe notification.
@@ -128,39 +173,29 @@ type Message struct {
 
 // Notification is a helper that returns a pub/sub notification as a
 // Subscription or a Message.
-func Notification(v interface{}, err error) (interface{}, error) {
+func Notification(reply interface{}, err error) (interface{}, error) {
+	multiBulk, err := MultiBulk(reply, err)
 	if err != nil {
 		return nil, err
 	}
-	err = errUnexpectedReplyType
-	s, ok := v.([]interface{})
-	if !ok || len(s) != 3 {
-		return nil, errUnexpectedReplyType
-	}
-	b, ok := s[0].([]byte)
-	if !ok {
-		return nil, errUnexpectedReplyType
-	}
-	kind := string(b)
 
-	b, ok = s[1].([]byte)
-	if !ok {
-		return nil, errUnexpectedReplyType
+	var kind, channel string
+	multiBulk, err = Values(multiBulk, &kind, &channel)
+	if err != nil {
+		return nil, err
 	}
-	channel := string(b)
 
 	if kind == "message" {
-		data, ok := s[2].([]byte)
-		if !ok {
-			return nil, errUnexpectedReplyType
+		var data []byte
+		if _, err := Values(multiBulk, &data); err != nil {
+			return nil, err
 		}
 		return Message{channel, data}, nil
 	}
 
-	count, ok := s[2].(int64)
-	if !ok {
-		return nil, errUnexpectedReplyType
+	var count int
+	if _, err := Values(multiBulk, &count); err != nil {
+		return nil, err
 	}
-
-	return Subscription{kind, channel, int(count)}, nil
+	return Subscription{kind, channel, count}, nil
 }