Explorar el Código

Improve pub/sub support.

- Add new Conn Flush method to flush buffered commands. Do not flush in
  Receive method. This allows an application to manage subscriptions and
  received pushed notifications in different goroutines.
- Add PubSubConn wrapper type for subscribers.
- Remove Notification helper function.
- Bonus: improve error handling on pooled connections.
Gary Burd hace 13 años
padre
commit
24a6a81341
Se han modificado 13 ficheros con 665 adiciones y 326 borrados
  1. 1 1
      README.markdown
  2. 132 92
      redis/conn.go
  3. 12 9
      redis/conn_test.go
  4. 124 0
      redis/doc.go
  5. 54 10
      redis/pool.go
  6. 87 0
      redis/pool_test.go
  7. 111 0
      redis/pubsub.go
  8. 128 0
      redis/pubsub_test.go
  9. 5 99
      redis/redis.go
  10. 0 52
      redis/reply.go
  11. 0 62
      redis/reply_test.go
  12. 10 0
      redis/script_test.go
  13. 1 1
      redis/test_test.go

+ 1 - 1
README.markdown

@@ -6,7 +6,7 @@ Redigo is a [Go](http://golang.org/) client for the [Redis](http://redis.io/) da
 Features
 -------
 
-* An [fmt.Print-like](http://go.pkgdoc.org/github.com/garyburd/redigo/redis#Executing_Commands) API with support for all Redis commands.
+* A [Print-like](http://go.pkgdoc.org/github.com/garyburd/redigo/redis#Executing_Commands) API with support for all Redis commands.
 * [Pipelining](http://go.pkgdoc.org/github.com/garyburd/redigo/redis#Pipelining), including pipelined transactions.
 * [Publish/Subscribe](http://go.pkgdoc.org/github.com/garyburd/redigo/redis#Publish_and_Subscribe).
 * [Connection pooling](http://go.pkgdoc.org/github.com/garyburd/redigo/redis#Pool).

+ 132 - 92
redis/conn.go

@@ -28,41 +28,56 @@ import (
 
 // conn is the low-level implementation of Conn
 type conn struct {
-	rw      bufio.ReadWriter
-	conn    net.Conn
-	scratch []byte
-	pending int
+	conn net.Conn
+
+	// Read
+	readTimeout time.Duration
+	br          *bufio.Reader
+	scratch     []byte
+
+	// Write
+	writeTimeout time.Duration
+	bw           *bufio.Writer
+
+	// Shared
 	mu      sync.Mutex
+	pending int
 	err     error
 }
 
 // Dial connects to the Redis server at the given network and address.
 func Dial(network, address string) (Conn, error) {
-	netConn, err := net.Dial(network, address)
+	c, err := net.Dial(network, address)
 	if err != nil {
 		return nil, errors.New("Could not connect to Redis server: " + err.Error())
 	}
-	return NewConn(netConn), nil
+	return NewConn(c, 0, 0), nil
 }
 
-// DialTimeout acts like Dial but takes a timeout. The timeout includes name
-// resolution, if required.
-func DialTimeout(network, address string, timeout time.Duration) (Conn, error) {
-	netConn, err := net.DialTimeout(network, address, timeout)
+// DialTimeout acts like Dial but takes timeouts for establishing the
+// connection to the server, write a command and reading a reply.
+func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) {
+	var c net.Conn
+	var err error
+	if connectTimeout > 0 {
+		c, err = net.DialTimeout(network, address, connectTimeout)
+	} else {
+		c, err = net.Dial(network, address)
+	}
 	if err != nil {
 		return nil, errors.New("Could not connect to Redis server: " + err.Error())
 	}
-	return NewConn(netConn), nil
+	return NewConn(c, readTimeout, writeTimeout), nil
 }
 
 // NewConn returns a new Redigo connection for the given net connection.
-func NewConn(netConn net.Conn) Conn {
+func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn {
 	return &conn{
-		conn: netConn,
-		rw: bufio.ReadWriter{
-			bufio.NewReader(netConn),
-			bufio.NewWriter(netConn),
-		},
+		conn:         netConn,
+		bw:           bufio.NewWriter(netConn),
+		br:           bufio.NewReader(netConn),
+		readTimeout:  readTimeout,
+		writeTimeout: writeTimeout,
 	}
 }
 
@@ -76,13 +91,6 @@ func (c *conn) Close() error {
 	return err
 }
 
-func (c *conn) Err() error {
-	c.mu.Lock()
-	err := c.err
-	c.mu.Unlock()
-	return err
-}
-
 func (c *conn) fatal(err error) error {
 	c.mu.Lock()
 	if c.err == nil {
@@ -92,54 +100,66 @@ func (c *conn) fatal(err error) error {
 	return err
 }
 
-func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
-	if err := c.Send(cmd, args...); err != nil {
-		return nil, err
-	}
-	var reply interface{}
-	var err = c.err
-	for c.pending > 0 && c.err == nil {
-		var e error
-		reply, e = c.Receive()
-		if e != nil && err == nil {
-			err = e
-		}
-	}
-	return reply, err
+func (c *conn) Err() error {
+	c.mu.Lock()
+	err := c.err
+	c.mu.Unlock()
+	return err
 }
 
 func (c *conn) writeN(prefix byte, n int) error {
 	c.scratch = append(c.scratch[0:0], prefix)
 	c.scratch = strconv.AppendInt(c.scratch, int64(n), 10)
 	c.scratch = append(c.scratch, "\r\n"...)
-	_, err := c.rw.Write(c.scratch)
+	_, err := c.bw.Write(c.scratch)
 	return err
 }
 
 func (c *conn) writeString(s string) error {
-	if err := c.writeN('$', len(s)); err != nil {
-		return err
-	}
-	if _, err := c.rw.WriteString(s); err != nil {
-		return err
-	}
-	_, err := c.rw.WriteString("\r\n")
+	c.writeN('$', len(s))
+	c.bw.WriteString(s)
+	_, err := c.bw.WriteString("\r\n")
 	return err
 }
 
 func (c *conn) writeBytes(p []byte) error {
-	if err := c.writeN('$', len(p)); err != nil {
-		return err
-	}
-	if _, err := c.rw.Write(p); err != nil {
-		return err
+	c.writeN('$', len(p))
+	c.bw.Write(p)
+	_, err := c.bw.WriteString("\r\n")
+	return err
+}
+
+func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
+	c.writeN('*', 1+len(args))
+	err = c.writeString(cmd)
+	for _, arg := range args {
+		if err != nil {
+			break
+		}
+		switch arg := arg.(type) {
+		case string:
+			err = c.writeString(arg)
+		case []byte:
+			err = c.writeBytes(arg)
+		case bool:
+			if arg {
+				err = c.writeString("1")
+			} else {
+				err = c.writeString("0")
+			}
+		case nil:
+			err = c.writeString("")
+		default:
+			var buf bytes.Buffer
+			fmt.Fprint(&buf, arg)
+			err = c.writeBytes(buf.Bytes())
+		}
 	}
-	_, err := c.rw.WriteString("\r\n")
 	return err
 }
 
 func (c *conn) readLine() ([]byte, error) {
-	p, err := c.rw.ReadSlice('\n')
+	p, err := c.br.ReadSlice('\n')
 	if err == bufio.ErrBufferFull {
 		return nil, errors.New("redigo: long response line")
 	}
@@ -153,7 +173,7 @@ func (c *conn) readLine() ([]byte, error) {
 	return p[:i], nil
 }
 
-func (c *conn) parseReply() (interface{}, error) {
+func (c *conn) readReply() (interface{}, error) {
 	line, err := c.readLine()
 	if err != nil {
 		return nil, err
@@ -178,7 +198,7 @@ func (c *conn) parseReply() (interface{}, error) {
 			return nil, err
 		}
 		p := make([]byte, n)
-		_, err = io.ReadFull(c.rw, p)
+		_, err = io.ReadFull(c.br, p)
 		if err != nil {
 			return nil, err
 		}
@@ -197,7 +217,7 @@ func (c *conn) parseReply() (interface{}, error) {
 		}
 		r := make([]interface{}, n)
 		for i := range r {
-			r[i], err = c.parseReply()
+			r[i], err = c.readReply()
 			if err != nil {
 				return nil, err
 			}
@@ -208,53 +228,73 @@ func (c *conn) parseReply() (interface{}, error) {
 }
 
 func (c *conn) Send(cmd string, args ...interface{}) error {
-	if err := c.writeN('*', 1+len(args)); err != nil {
-		return c.fatal(err)
+	if c.writeTimeout != 0 {
+		c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
 	}
-
-	if err := c.writeString(cmd); err != nil {
+	if err := c.writeCommand(cmd, args); err != nil {
 		return c.fatal(err)
 	}
+	c.mu.Lock()
+	c.pending += 1
+	c.mu.Unlock()
+	return nil
+}
 
-	for _, arg := range args {
-		var err error
-		switch arg := arg.(type) {
-		case string:
-			err = c.writeString(arg)
-		case []byte:
-			err = c.writeBytes(arg)
-		case bool:
-			if arg {
-				err = c.writeString("1")
-			} else {
-				err = c.writeString("0")
-			}
-		case nil:
-			err = c.writeString("")
-		default:
-			var buf bytes.Buffer
-			fmt.Fprint(&buf, arg)
-			err = c.writeBytes(buf.Bytes())
-		}
-		if err != nil {
-			return c.fatal(err)
-		}
+func (c *conn) Flush() error {
+	if c.writeTimeout != 0 {
+		c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
+	}
+	if err := c.bw.Flush(); err != nil {
+		return c.fatal(err)
 	}
-	c.pending += 1
 	return nil
 }
 
-func (c *conn) Receive() (interface{}, error) {
-	c.pending -= 1
-	if err := c.rw.Flush(); err != nil {
-		return nil, c.fatal(err)
+func (c *conn) Receive() (reply interface{}, err error) {
+	c.mu.Lock()
+	// There can be more receives than sends when using pub/sub. To allow
+	// normal use of the connection after unsubscribe from all channels, do not
+	// decrement pending to a negative value.
+	if c.pending > 0 {
+		c.pending -= 1
 	}
-	v, err := c.parseReply()
-	if err != nil {
+	c.mu.Unlock()
+	if c.readTimeout != 0 {
+		c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
+	}
+	if reply, err = c.readReply(); err != nil {
 		return nil, c.fatal(err)
 	}
-	if err, ok := v.(Error); ok {
+	if err, ok := reply.(Error); ok {
 		return nil, err
 	}
-	return v, nil
+	return
+}
+
+func (c *conn) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
+	// Send
+	if c.writeTimeout != 0 {
+		c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
+	}
+	c.writeCommand(cmd, args)
+	if err = c.bw.Flush(); err != nil {
+		return nil, c.fatal(err)
+	}
+
+	c.mu.Lock()
+	pending := c.pending
+	c.pending = 0
+	c.mu.Unlock()
+
+	// Receive
+	for ; pending >= 0; pending-- {
+		var e error
+		if reply, e = c.readReply(); e != nil {
+			return nil, c.fatal(e)
+		}
+		if e, ok := reply.(Error); ok && err == nil {
+			err = e
+		}
+	}
+	return
 }

+ 12 - 9
redis/conn_test.go

@@ -22,9 +22,10 @@ import (
 	"reflect"
 	"strings"
 	"testing"
+	"time"
 )
 
-var sendTests = []struct {
+var writeTests = []struct {
 	args     []interface{}
 	expected string
 }{
@@ -50,8 +51,8 @@ var sendTests = []struct {
 	},
 }
 
-func TestSend(t *testing.T) {
-	for _, tt := range sendTests {
+func TestWrite(t *testing.T) {
+	for _, tt := range writeTests {
 		var buf bytes.Buffer
 		rw := bufio.ReadWriter{Writer: bufio.NewWriter(&buf)}
 		c := redis.NewConnBufio(rw)
@@ -70,7 +71,7 @@ func TestSend(t *testing.T) {
 
 var errorSentinel = &struct{}{}
 
-var receiveTests = []struct {
+var readTests = []struct {
 	reply    string
 	expected interface{}
 }{
@@ -112,8 +113,8 @@ var receiveTests = []struct {
 	},
 }
 
-func TestReceive(t *testing.T) {
-	for _, tt := range receiveTests {
+func TestRead(t *testing.T) {
+	for _, tt := range readTests {
 		rw := bufio.ReadWriter{
 			Reader: bufio.NewReader(strings.NewReader(tt.reply)),
 			Writer: bufio.NewWriter(nil), // writer need to support Flush
@@ -153,7 +154,7 @@ func (t testConn) Close() error {
 }
 
 func dial() (redis.Conn, error) {
-	c, err := redis.Dial("tcp", ":6379")
+	c, err := redis.DialTimeout("tcp", ":6379", 0, 1*time.Second, 1*time.Second)
 	if err != nil {
 		return nil, err
 	}
@@ -265,12 +266,14 @@ func TestPipelineCommands(t *testing.T) {
 	defer c.Close()
 
 	for _, cmd := range testCommands {
-		err := c.Send(cmd.args[0].(string), cmd.args[1:]...)
-		if err != nil {
+		if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil {
 			t.Errorf("Send(%v) returned error %v", cmd.args, err)
 			continue
 		}
 	}
+	if err := c.Flush(); err != nil {
+		t.Errorf("Flush() returned error %v", err)
+	}
 	for _, cmd := range testCommands {
 		actual, err := c.Receive()
 		if err != nil {

+ 124 - 0
redis/doc.go

@@ -0,0 +1,124 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+// Package redis is a client for the Redis database.
+//
+// Package redis only supports the binary-safe Redis protocol, so you can use
+// it with any Redis version >= 1.2.0.
+//
+// Connections
+//
+// The Conn interface is the primary interface for working with Redis.
+// Applications create connections by calling the Dial, DialWithTimeout or
+// NewConn functions. In the future, functions will be added for creating
+// shareded and other types of connections.
+//
+// The application must call the connection Close method when the application
+// is done with the connection.
+//
+// Executing Commands
+//
+// The Conn interface has a generic method for executing Redis commands:
+//
+//  Do(commandName string, args ...interface{}) (reply interface{}, err error)
+//
+// Arguments of type string and []byte are sent to the server as is. The value
+// false is converted to "0" and the value true is converted to "1". The value
+// nil is converted to "". All other values are converted to a string using the
+// fmt.Fprint function. Command replies are represented using the following Go
+// types:
+//
+//  Redis type          Go type
+//  error               redis.Error
+//  integer             int64
+//  status              string
+//  bulk                []byte or nil if value not present.
+//  multi-bulk          []interface{} or nil if value not present.
+// 
+// Applications can use type assertions or type switches to determine the type
+// of a reply.
+//
+// Pipelining
+//
+// Connections support pipelining using the Send, Flush and Receive methods. 
+//
+//  Send(commandName string, args ...interface{}) error
+//  Flush() error
+//  Receive() (reply interface{}, err error)
+//
+// Send writes the command to the connection's output buffer. Flush flushes the
+// connection's output buffer to the server. Receive reads a single reply from
+// the server. The following example shows a simple pipeline.
+//
+//  c.Send("SET", "foo", "bar")
+//  c.Send("GET", "foo")
+//  c.Flush()
+//  c.Receive() // reply from SET
+//  v, err = c.Receive() // reply from GET
+//
+// The Do method combines the functionality of the Send, Flush and Receive
+// methods. The Do method starts by writing the command and flushing the output
+// buffer. Next, the Do method receives all pending replies including the reply
+// for the command just sent by Do. If any of the received replies is an error,
+// then Do returns the error. If there are no errors, then Do returns the last
+// reply.
+//
+// Use the Send and Do methods to implement pipelined transactions.
+//
+//  c.Send("MULTI")
+//  c.Send("INCR", "foo")
+//  c.Send("INCR", "bar")
+//  r, err := c.Do("EXEC")
+//  fmt.Println(r) // prints [1, 1]
+//
+// Thread Safety
+//
+// The connection Send and Flush methods cannot be called concurrently with
+// other calls to these methods. The connection Receive method cannot be called
+// concurrently  with other calls to Receive. Because the connection Do method
+// uses Send, Flush and Receive, the Do method cannot be called concurrently
+// with Send, Flush, Receive or Do. Unless stated otherwise, all other
+// concurrent access is allowed.
+//
+// Publish and Subscribe
+//  
+// Use the Send, Flush and Receive methods to implement Pub/Sub subscribers.
+//
+//  c.Send("SUBSCRIBE", "example")
+//  c.Flush()
+//  for {
+//      reply, err := c.Receive()
+//      if err != nil {
+//          return err
+//      }
+//      // process pushed message
+//  }
+// 
+// The PubSubConn type wraps a Conn with convenience methods for implementing
+// subscribers. The Subscribe, PSubscribe, Unsubscribe and PUnsubscribe methods
+// send and flush a subscription management command. The receive method
+// converts a pushed message to convenient types for use in a type switch.
+//
+//  psc := PubSubConn{c}
+//  psc.Subscribe("example")
+//  for {
+//      switch v := psc.Receive().(type) {
+//      case redis.Message:
+//          fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
+//      case redis.Subscription:
+//          fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
+//      case error:
+//          return v
+//  }
+package redis

+ 54 - 10
redis/pool.go

@@ -14,8 +14,14 @@
 
 package redis
 
+import (
+	"errors"
+)
+
 // Pool maintains a pool of connections.
 //
+// Pooled connections do not support concurrent access or pub/sub.
+//
 // The following example shows how to use a pool in a web application. The
 // application creates a pool at application startup and makes it available to
 // request handlers, possibly using a global variable:
@@ -54,7 +60,8 @@ type Pool struct {
 }
 
 type pooledConnection struct {
-	Conn
+	c    Conn
+	err  error
 	pool *Pool
 }
 
@@ -78,21 +85,58 @@ func (p *Pool) Get() (Conn, error) {
 			return nil, err
 		}
 	}
-	return &pooledConnection{Conn: c, pool: p}, nil
+	return &pooledConnection{c: c, pool: p}, nil
+}
+
+func (c *pooledConnection) Err() error {
+	if c.err != nil {
+		return c.err
+	}
+	return c.c.Err()
+}
+
+func (c *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
+	if c.err != nil {
+		return nil, c.err
+	}
+	return c.c.Do(commandName, args...)
+}
+
+func (c *pooledConnection) Send(commandName string, args ...interface{}) error {
+	if c.err != nil {
+		return c.err
+	}
+	return c.c.Send(commandName, args...)
+}
+
+func (c *pooledConnection) Flush() error {
+	if c.err != nil {
+		return c.err
+	}
+	return c.c.Flush()
 }
 
-func (c *pooledConnection) Close() error {
-	if c.Conn == nil {
-		return nil
+func (c *pooledConnection) Receive() (reply interface{}, err error) {
+	if c.err != nil {
+		return nil, c.err
+	}
+	return c.c.Receive()
+}
+
+var errPoolClosed = errors.New("redigo: pooled connection closed")
+
+func (c *pooledConnection) Close() (err error) {
+	if c.err != nil {
+		return c.err
 	}
-	if c.Err() != nil {
-		return nil
+	c.err = errPoolClosed
+	if c.c.Err() != nil {
+		return c.c.Close()
 	}
 	select {
-	case c.pool.conns <- c.Conn:
+	case c.pool.conns <- c.c:
 	default:
-		c.Conn.Close()
+		return c.c.Close()
 	}
-	c.Conn = nil
 	return nil
 }

+ 87 - 0
redis/pool_test.go

@@ -0,0 +1,87 @@
+// Copyright 2011 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+import (
+	"io"
+	"testing"
+)
+
+type fakeConn struct {
+	closed bool
+	err    error
+}
+
+func (c *fakeConn) Close() error { c.closed = true; return nil }
+func (c *fakeConn) Err() error   { return c.err }
+
+func (c *fakeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
+	return nil, nil
+}
+
+func (c *fakeConn) Send(commandName string, args ...interface{}) error {
+	return nil
+}
+
+func (c *fakeConn) Flush() error {
+	return nil
+}
+
+func (c *fakeConn) Receive() (reply interface{}, err error) {
+	return nil, nil
+}
+
+func TestPool(t *testing.T) {
+	var count int
+	p := NewPool(func() (Conn, error) { count += 1; return &fakeConn{}, nil }, 2)
+
+	count = 0
+	for i := 0; i < 10; i++ {
+		c1, _ := p.Get()
+		c2, _ := p.Get()
+		c1.Close()
+		c2.Close()
+	}
+	if count != 2 {
+		t.Fatal("expected count 1, actual", count)
+	}
+
+	p.Get()
+	p.Get()
+	count = 0
+	for i := 0; i < 10; i++ {
+		c, _ := p.Get()
+		c.(*pooledConnection).c.(*fakeConn).err = io.EOF
+		c.Close()
+	}
+	if count != 10 {
+		t.Fatal("expected count 10, actual", count)
+	}
+
+	p.Get()
+	p.Get()
+	count = 0
+	for i := 0; i < 10; i++ {
+		c1, _ := p.Get()
+		c2, _ := p.Get()
+		c3, _ := p.Get()
+		c1.Close()
+		c2.Close()
+		c3.Close()
+	}
+	if count != 12 {
+		t.Fatal("expected count 12, actual", count)
+	}
+}

+ 111 - 0
redis/pubsub.go

@@ -0,0 +1,111 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+import (
+	"bytes"
+)
+
+// Subscribe represents a subscribe or unsubscribe notification.
+type Subscription struct {
+
+	// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
+	Kind string
+
+	// The channel that was changed.
+	Channel string
+
+	// The current number of subscriptions for connection.
+	Count int
+}
+
+// Message represents a message notification.
+type Message struct {
+
+	// The originating channel.
+	Channel string
+
+	// The message data.
+	Data []byte
+}
+
+// PubSubConn wraps a Conn with convenience methods for subscribers.
+type PubSubConn struct {
+	Conn Conn
+}
+
+// Close closes the connection.
+func (c PubSubConn) Close() error {
+	return c.Conn.Close()
+}
+
+// Subscribe subscribes the connection to the specified channels.
+func (c PubSubConn) Subscribe(channel ...interface{}) error {
+	c.Conn.Send("SUBSCRIBE", channel...)
+	return c.Conn.Flush()
+}
+
+// PSubscribe subscribes the connection to the given patterns.
+func (c PubSubConn) PSubscribe(channel ...interface{}) error {
+	c.Conn.Send("PSUBSCRIBE", channel...)
+	return c.Conn.Flush()
+}
+
+// Unsubscribe unsubscribes the connection from the given channels, or from all
+// of them if none is given.
+func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
+	c.Conn.Send("UNSUBSCRIBE", channel...)
+	return c.Conn.Flush()
+}
+
+// PUnsubscribe unsubscribes the connection from the given patterns, or from all
+// of them if none is given.
+func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
+	c.Conn.Send("PUNSUBSCRIBE", channel...)
+	return c.Conn.Flush()
+}
+
+var messageBytes = []byte("message")
+
+// Receive returns a pushed message as a Subscription, Message or error. The
+// return value is intended to be used directly in a type switch as illustrated
+// in the PubSubConn example.
+func (c PubSubConn) Receive() interface{} {
+	multiBulk, err := MultiBulk(c.Conn.Receive())
+	if err != nil {
+		return err
+	}
+
+	var kind []byte
+	var channel string
+	multiBulk, err = Values(multiBulk, &kind, &channel)
+	if err != nil {
+		return err
+	}
+
+	if bytes.Equal(kind, messageBytes) {
+		var data []byte
+		if _, err := Values(multiBulk, &data); err != nil {
+			return err
+		}
+		return Message{channel, data}
+	}
+
+	var count int
+	if _, err := Values(multiBulk, &count); err != nil {
+		return err
+	}
+	return Subscription{string(kind), channel, count}
+}

+ 128 - 0
redis/pubsub_test.go

@@ -0,0 +1,128 @@
+// Copyright 2012 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redis_test
+
+import (
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"net"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+func publish(channel, value interface{}) {
+	c, err := dial()
+	if err != nil {
+		panic(err)
+	}
+	defer c.Close()
+	c.Do("PUBLISH", channel, value)
+}
+
+// Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine.
+func ExamplePubSubConn() {
+	c, err := dial()
+	if err != nil {
+		panic(err)
+	}
+	defer c.Close()
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	psc := redis.PubSubConn{c}
+
+	// This goroutine receives and prints pushed messages from the server. The
+	// goroutine exits when the connection is unsubscribed from all channels or
+	// there is an error.
+	go func() {
+		defer wg.Done()
+		for {
+			switch n := psc.Receive().(type) {
+			case redis.Message:
+				fmt.Printf("%s: message: %s\n", n.Channel, n.Data)
+			case redis.Subscription:
+				fmt.Printf("%s: %s %d\n", n.Channel, n.Kind, n.Count)
+				if n.Count == 0 {
+					return
+				}
+			case error:
+				fmt.Printf("error: %v\n", n)
+				return
+			}
+		}
+	}()
+
+	// This goroutine manages subscriptions for the connection. 
+	go func() {
+		defer wg.Done()
+
+		psc.Subscribe("example")
+
+		// The following function calls publish a message using another
+		// connection to the Redis server.
+		publish("example", "hello")
+		publish("example", "world")
+
+		// Unsubscribe from all connections. This will cause the receiving
+		// goroutine to exit.
+		psc.Unsubscribe()
+	}()
+
+	wg.Wait()
+
+	// Output:
+	// example: subscribe 1
+	// example: message: hello
+	// example: message: world
+	// example: unsubscribe 0
+}
+
+func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) {
+	actual := c.Receive()
+	if !reflect.DeepEqual(actual, expected) {
+		t.Errorf("%s = %v, want %v", message, actual, expected)
+	}
+}
+
+func TestPushed(t *testing.T) {
+	pc := dialt(t)
+	defer pc.Close()
+
+	nc, err := net.Dial("tcp", ":6379")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer nc.Close()
+	nc.SetReadDeadline(time.Now().Add(4 * time.Second))
+
+	c := redis.PubSubConn{redis.NewConn(nc, 0, 0)}
+
+	c.Subscribe("c1")
+	expectPushed(t, c, "Subscribe(c1)", redis.Subscription{"subscribe", "c1", 1})
+	c.Subscribe("c2")
+	expectPushed(t, c, "Subscribe(c2)", redis.Subscription{"subscribe", "c2", 2})
+	c.PSubscribe("p1")
+	expectPushed(t, c, "PSubscribe(p1)", redis.Subscription{"psubscribe", "p1", 3})
+	c.PSubscribe("p2")
+	expectPushed(t, c, "PSubscribe(p2)", redis.Subscription{"psubscribe", "p2", 4})
+	c.PUnsubscribe()
+	expectPushed(t, c, "Punsubscribe(p1)", redis.Subscription{"punsubscribe", "p1", 3})
+	expectPushed(t, c, "Punsubscribe()", redis.Subscription{"punsubscribe", "p2", 2})
+
+	pc.Do("PUBLISH", "c1", "hello")
+	expectPushed(t, c, "PUBLISH c1 hello", redis.Message{"c1", []byte("hello")})
+}

+ 5 - 99
redis/redis.go

@@ -12,105 +12,8 @@
 // License for the specific language governing permissions and limitations
 // under the License.
 
-// Package redis is a client for the Redis database.
-//
-// Package redis only supports the binary-safe Redis protocol, so you can use
-// it with any Redis version >= 1.2.0.
-//
-// Connections
-//
-// The Conn interface is the primary interface for working with Redis.
-// Applications create connections by calling the Dial, DialWithTimeout or
-// NewConn functions. In the future, functions will be added for creating
-// pooled connections and sharded connections.
-//
-// The application must call the connection Close method when the application
-// is done with the connection.
-//
-// Executing Commands
-//
-// The Conn interface has a generic method for executing Redis commands:
-//
-//  Do(commandName string, args ...interface{}) (reply interface{}, err error)
-//
-// Arguments of type string and []byte are sent to the server as is. The value
-// false is converted to "0" and the value true is converted to "1". The value
-// nil is converted to "". All other values are converted to a string using the
-// fmt.Fprint function. Command replies are represented using the following Go
-// types:
-//
-//  Redis type          Go type
-//  error               redis.Error
-//  integer             int64
-//  status              string
-//  bulk                []byte or nil if value not present.
-//  multi-bulk          []interface{} or nil if value not present.
-// 
-// Applications can use type assertions or type switches to determine the type
-// of a reply.
-//
-// Pipelining
-//
-// Connections support pipelining using the Send and Receive methods. 
-//
-//  Send(commandName string, args ...interface{}) error
-//  Receive() (reply interface{}, err error)
-//
-// Send writes the command to the connection's output buffer. Receive flushes
-// the output buffer to the server and reads a single reply. The following
-// example shows a simple pipeline:
-//
-//  c.Send("SET", "foo", "bar")
-//  c.Send("GET", "foo")
-//  // reply from SET
-//  if _, err := c.Receive(); err != nil {
-//      return err
-//  }
-//  // reply from GET
-//  v, err := c.Receive()
-//  if err != nil {
-//      return err
-//  }
-//
-// The Do method is implemented with the Send and Receive methods. The method
-// starts by sending the command. Next, the method receives all unconsumed
-// replies including the reply for the command just sent by Do. If any of the
-// received replies is an error, then Do returns the error. If there are no
-// errors, then Do returns the last reply.
-//
-// The Send and Do methods can be used together to implement pipelined
-// transactions:
-//
-//  c.Send("MULTI")
-//  c.Send("INCR", "foo")
-//  c.Send("INCR", "bar")
-//  r, err := c.Do("EXEC")
-//  fmt.Println(r) // prints [1, 1]
-//
-// Publish and Subscribe
-//  
-// The connection Receive method is used to implement blocking subscribers: 
-//
-//  c.Send("SUBSCRIBE", "foo")
-//  for {
-//      reply, err := c.Receive()
-//      if err != nil {
-//          return err
-//      }
-//      // consume message
-//  }
-//
-// Thread Safety
-//
-// The Send method cannot be called concurrently with other calls to Send. The
-// Receive method cannot be called concurrently  with other calls to Receive.
-// Because the Do method invokes Send and Receive, the Do method cannot be
-// called concurrently  with Send, Receive or Do. All other concurrent access is
-// allowed.
 package redis
 
-import ()
-
 // Error represents an error returned in a command reply.
 type Error string
 
@@ -127,9 +30,12 @@ type Conn interface {
 	// Do sends a command to the server and returns the received reply.
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)
 
-	// Send sends a command for the server without waiting for a reply.
+	// Send writes the command to the client's output buffer.
 	Send(commandName string, args ...interface{}) error
 
-	// Receive receives a single reply from the server
+	// Flush flushes the output buffer to the Redis server.
+	Flush() error
+
+	// Receive receives a single reply from the Redis server
 	Receive() (reply interface{}, err error)
 }

+ 0 - 52
redis/reply.go

@@ -177,55 +177,3 @@ func MultiBulk(v interface{}, err error) ([]interface{}, error) {
 	}
 	return nil, fmt.Errorf("redigo: unexpected type for MultiBulk, got type %T", v)
 }
-
-// Subscribe represents a subscribe or unsubscribe notification.
-type Subscription struct {
-
-	// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
-	Kind string
-
-	// The channel that was changed.
-	Channel string
-
-	// The current number of subscriptions for connection.
-	Count int
-}
-
-// Message represents a message notification.
-type Message struct {
-
-	// The originating channel.
-	Channel string
-
-	// The message data.
-	Data []byte
-}
-
-// Notification is a helper that returns a pub/sub notification as a
-// Subscription or a Message.
-func Notification(reply interface{}, err error) (interface{}, error) {
-	multiBulk, err := MultiBulk(reply, err)
-	if err != nil {
-		return nil, err
-	}
-
-	var kind, channel string
-	multiBulk, err = Values(multiBulk, &kind, &channel)
-	if err != nil {
-		return nil, err
-	}
-
-	if kind == "message" {
-		var data []byte
-		if _, err := Values(multiBulk, &data); err != nil {
-			return nil, err
-		}
-		return Message{channel, data}, nil
-	}
-
-	var count int
-	if _, err := Values(multiBulk, &count); err != nil {
-		return nil, err
-	}
-	return Subscription{kind, channel, count}, nil
-}

+ 0 - 62
redis/reply_test.go

@@ -17,10 +17,6 @@ package redis_test
 import (
 	"fmt"
 	"github.com/garyburd/redigo/redis"
-	"net"
-	"reflect"
-	"testing"
-	"time"
 )
 
 func ExampleBool() {
@@ -67,61 +63,3 @@ func ExampleString() {
 	// Output:
 	// "world"
 }
-
-func ExampleNotification(c redis.Conn) {
-	c.Send("SUBSCRIBE", "mychannel")
-	for {
-		n, err := redis.Notification(c.Receive())
-		if err != nil {
-			break
-		}
-		switch n := n.(type) {
-		case redis.Message:
-			fmt.Printf("%s: message: %s", n.Channel, n.Data)
-		case redis.Subscription:
-			fmt.Printf("%s: %s %d", n.Channel, n.Kind, n.Count)
-		default:
-			panic("unexpected")
-		}
-	}
-}
-
-func expectNotification(t *testing.T, c redis.Conn, message string, expected interface{}) {
-	actual, err := redis.Notification(c.Receive())
-	if err != nil {
-		t.Errorf("%s returned error %v", message, err)
-		return
-	}
-	if !reflect.DeepEqual(actual, expected) {
-		t.Errorf("%s = %v, want %v", message, actual, expected)
-	}
-}
-
-func TestNotification(t *testing.T) {
-	pc := dialt(t)
-	defer pc.Close()
-
-	nc, err := net.Dial("tcp", ":6379")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer nc.Close()
-	nc.SetReadDeadline(time.Now().Add(4 * time.Second))
-
-	c := redis.NewConn(nc)
-
-	c.Send("SUBSCRIBE", "c1")
-	expectNotification(t, c, "Subscribe(c1)", redis.Subscription{"subscribe", "c1", 1})
-	c.Send("SUBSCRIBE", "c2")
-	expectNotification(t, c, "Subscribe(c2)", redis.Subscription{"subscribe", "c2", 2})
-	c.Send("PSUBSCRIBE", "p1")
-	expectNotification(t, c, "PSubscribe(p1)", redis.Subscription{"psubscribe", "p1", 3})
-	c.Send("PSUBSCRIBE", "p2")
-	expectNotification(t, c, "PSubscribe(p2)", redis.Subscription{"psubscribe", "p2", 4})
-	c.Send("PUNSUBSCRIBE")
-	expectNotification(t, c, "Punsubscribe(p1)", redis.Subscription{"punsubscribe", "p1", 3})
-	expectNotification(t, c, "Punsubscribe()", redis.Subscription{"punsubscribe", "p2", 2})
-
-	pc.Do("PUBLISH", "c1", "hello")
-	expectNotification(t, c, "PUBLISH c1 hello", redis.Message{"c1", []byte("hello")})
-}

+ 10 - 0
redis/script_test.go

@@ -60,6 +60,11 @@ func TestScript(t *testing.T) {
 		t.Errorf("s.SendHash(c, ...) returned %v", err)
 	}
 
+	err = c.Flush()
+	if err != nil {
+		t.Errorf("s.Flush() returned %v", err)
+	}
+
 	v, err = c.Receive()
 	if !reflect.DeepEqual(v, reply) {
 		t.Errorf("s.SendHash(c, ..); s.Recevie() = %v, want %v", v, reply)
@@ -70,6 +75,11 @@ func TestScript(t *testing.T) {
 		t.Errorf("s.Send(c, ...) returned %v", err)
 	}
 
+	err = c.Flush()
+	if err != nil {
+		t.Errorf("s.Flush() returned %v", err)
+	}
+
 	v, err = c.Receive()
 	if !reflect.DeepEqual(v, reply) {
 		t.Errorf("s.Send(c, ..); s.Recevie() = %v, want %v", v, reply)

+ 1 - 1
redis/test_test.go

@@ -20,5 +20,5 @@ import (
 
 // NewConnBufio is a hook for tests.
 func NewConnBufio(rw bufio.ReadWriter) Conn {
-	return &conn{rw: rw}
+	return &conn{br: rw.Reader, bw: rw.Writer}
 }