Prechádzať zdrojové kódy

Add pmessage support to PubSubConn.

Gary Burd 13 rokov pred
rodič
commit
f3b6a4a61c
2 zmenil súbory, kde vykonal 56 pridanie a 28 odobranie
  1. 37 19
      redis/pubsub.go
  2. 19 9
      redis/pubsub_test.go

+ 37 - 19
redis/pubsub.go

@@ -15,7 +15,7 @@
 package redis
 package redis
 
 
 import (
 import (
-	"bytes"
+	"errors"
 )
 )
 
 
 // Subscribe represents a subscribe or unsubscribe notification.
 // Subscribe represents a subscribe or unsubscribe notification.
@@ -41,6 +41,19 @@ type Message struct {
 	Data []byte
 	Data []byte
 }
 }
 
 
+// PMessage represents a pmessage notification.
+type PMessage struct {
+
+	// The matched pattern.
+	Pattern string
+
+	// The originating channel.
+	Channel string
+
+	// The message data.
+	Data []byte
+}
+
 // PubSubConn wraps a Conn with convenience methods for subscribers.
 // PubSubConn wraps a Conn with convenience methods for subscribers.
 type PubSubConn struct {
 type PubSubConn struct {
 	Conn Conn
 	Conn Conn
@@ -77,35 +90,40 @@ func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
 	return c.Conn.Flush()
 	return c.Conn.Flush()
 }
 }
 
 
-var messageBytes = []byte("message")
-
-// Receive returns a pushed message as a Subscription, Message or error. The
-// return value is intended to be used directly in a type switch as illustrated
-// in the PubSubConn example.
+// Receive returns a pushed message as a Subscription, Message, PMessage or
+// error. The return value is intended to be used directly in a type switch as
+// illustrated in the PubSubConn example.
 func (c PubSubConn) Receive() interface{} {
 func (c PubSubConn) Receive() interface{} {
 	multiBulk, err := MultiBulk(c.Conn.Receive())
 	multiBulk, err := MultiBulk(c.Conn.Receive())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	var kind []byte
-	var channel string
-	multiBulk, err = Values(multiBulk, &kind, &channel)
+	var kind string
+	multiBulk, err = Values(multiBulk, &kind)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	if bytes.Equal(kind, messageBytes) {
-		var data []byte
-		if _, err := Values(multiBulk, &data); err != nil {
+	switch kind {
+	case "message":
+		var m Message
+		if _, err := Values(multiBulk, &m.Channel, &m.Data); err != nil {
 			return err
 			return err
 		}
 		}
-		return Message{channel, data}
-	}
-
-	var count int
-	if _, err := Values(multiBulk, &count); err != nil {
-		return err
+		return m
+	case "pmessage":
+		var pm PMessage
+		if _, err := Values(multiBulk, &pm.Pattern, &pm.Channel, &pm.Data); err != nil {
+			return err
+		}
+		return pm
+	case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
+		s := Subscription{Kind: kind}
+		if _, err := Values(multiBulk, &s.Channel, &s.Count); err != nil {
+			return err
+		}
+		return s
 	}
 	}
-	return Subscription{string(kind), channel, count}
+	return errors.New("redigo: unknown pubsub notification")
 }
 }

+ 19 - 9
redis/pubsub_test.go

@@ -45,17 +45,19 @@ func ExamplePubSubConn() {
 
 
 	psc := redis.PubSubConn{c}
 	psc := redis.PubSubConn{c}
 
 
-	// This goroutine receives and prints pushed messages from the server. The
-	// goroutine exits when the connection is unsubscribed from all channels or
-	// there is an error.
+	// This goroutine receives and prints pushed notifications from the server.
+	// The goroutine exits when the connection is unsubscribed from all
+	// channels or there is an error.
 	go func() {
 	go func() {
 		defer wg.Done()
 		defer wg.Done()
 		for {
 		for {
 			switch n := psc.Receive().(type) {
 			switch n := psc.Receive().(type) {
 			case redis.Message:
 			case redis.Message:
-				fmt.Printf("%s: message: %s\n", n.Channel, n.Data)
+				fmt.Printf("Message: %s %s\n", n.Channel, n.Data)
+			case redis.PMessage:
+				fmt.Printf("PMessage: %s %s %s\n", n.Pattern, n.Channel, n.Data)
 			case redis.Subscription:
 			case redis.Subscription:
-				fmt.Printf("%s: %s %d\n", n.Channel, n.Kind, n.Count)
+				fmt.Printf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count)
 				if n.Count == 0 {
 				if n.Count == 0 {
 					return
 					return
 				}
 				}
@@ -71,24 +73,32 @@ func ExamplePubSubConn() {
 		defer wg.Done()
 		defer wg.Done()
 
 
 		psc.Subscribe("example")
 		psc.Subscribe("example")
+		psc.PSubscribe("p*")
 
 
 		// The following function calls publish a message using another
 		// The following function calls publish a message using another
 		// connection to the Redis server.
 		// connection to the Redis server.
 		publish("example", "hello")
 		publish("example", "hello")
 		publish("example", "world")
 		publish("example", "world")
+		publish("pexample", "foo")
+		publish("pexample", "bar")
 
 
 		// Unsubscribe from all connections. This will cause the receiving
 		// Unsubscribe from all connections. This will cause the receiving
 		// goroutine to exit.
 		// goroutine to exit.
 		psc.Unsubscribe()
 		psc.Unsubscribe()
+		psc.PUnsubscribe()
 	}()
 	}()
 
 
 	wg.Wait()
 	wg.Wait()
 
 
 	// Output:
 	// Output:
-	// example: subscribe 1
-	// example: message: hello
-	// example: message: world
-	// example: unsubscribe 0
+	// Subscription: subscribe example 1
+	// Subscription: psubscribe p* 2
+	// Message: example hello
+	// Message: example world
+	// PMessage: p* pexample foo
+	// PMessage: p* pexample bar
+	// Subscription: unsubscribe example 1
+	// Subscription: punsubscribe p* 0
 }
 }
 
 
 func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) {
 func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) {