pubsub_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright 2012 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redis_test
  15. import (
  16. "fmt"
  17. "github.com/garyburd/redigo/redis"
  18. "net"
  19. "reflect"
  20. "sync"
  21. "testing"
  22. "time"
  23. )
  24. func publish(channel, value interface{}) {
  25. c, err := dial()
  26. if err != nil {
  27. panic(err)
  28. }
  29. defer c.Close()
  30. c.Do("PUBLISH", channel, value)
  31. }
  32. // Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine.
  33. func ExamplePubSubConn() {
  34. c, err := dial()
  35. if err != nil {
  36. panic(err)
  37. }
  38. defer c.Close()
  39. var wg sync.WaitGroup
  40. wg.Add(2)
  41. psc := redis.PubSubConn{c}
  42. // This goroutine receives and prints pushed messages from the server. The
  43. // goroutine exits when the connection is unsubscribed from all channels or
  44. // there is an error.
  45. go func() {
  46. defer wg.Done()
  47. for {
  48. switch n := psc.Receive().(type) {
  49. case redis.Message:
  50. fmt.Printf("%s: message: %s\n", n.Channel, n.Data)
  51. case redis.Subscription:
  52. fmt.Printf("%s: %s %d\n", n.Channel, n.Kind, n.Count)
  53. if n.Count == 0 {
  54. return
  55. }
  56. case error:
  57. fmt.Printf("error: %v\n", n)
  58. return
  59. }
  60. }
  61. }()
  62. // This goroutine manages subscriptions for the connection.
  63. go func() {
  64. defer wg.Done()
  65. psc.Subscribe("example")
  66. // The following function calls publish a message using another
  67. // connection to the Redis server.
  68. publish("example", "hello")
  69. publish("example", "world")
  70. // Unsubscribe from all connections. This will cause the receiving
  71. // goroutine to exit.
  72. psc.Unsubscribe()
  73. }()
  74. wg.Wait()
  75. // Output:
  76. // example: subscribe 1
  77. // example: message: hello
  78. // example: message: world
  79. // example: unsubscribe 0
  80. }
  81. func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) {
  82. actual := c.Receive()
  83. if !reflect.DeepEqual(actual, expected) {
  84. t.Errorf("%s = %v, want %v", message, actual, expected)
  85. }
  86. }
  87. func TestPushed(t *testing.T) {
  88. pc := dialt(t)
  89. defer pc.Close()
  90. nc, err := net.Dial("tcp", ":6379")
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. defer nc.Close()
  95. nc.SetReadDeadline(time.Now().Add(4 * time.Second))
  96. c := redis.PubSubConn{redis.NewConn(nc, 0, 0)}
  97. c.Subscribe("c1")
  98. expectPushed(t, c, "Subscribe(c1)", redis.Subscription{"subscribe", "c1", 1})
  99. c.Subscribe("c2")
  100. expectPushed(t, c, "Subscribe(c2)", redis.Subscription{"subscribe", "c2", 2})
  101. c.PSubscribe("p1")
  102. expectPushed(t, c, "PSubscribe(p1)", redis.Subscription{"psubscribe", "p1", 3})
  103. c.PSubscribe("p2")
  104. expectPushed(t, c, "PSubscribe(p2)", redis.Subscription{"psubscribe", "p2", 4})
  105. c.PUnsubscribe()
  106. expectPushed(t, c, "Punsubscribe(p1)", redis.Subscription{"punsubscribe", "p1", 3})
  107. expectPushed(t, c, "Punsubscribe()", redis.Subscription{"punsubscribe", "p2", 2})
  108. pc.Do("PUBLISH", "c1", "hello")
  109. expectPushed(t, c, "PUBLISH c1 hello", redis.Message{"c1", []byte("hello")})
  110. }