pubsub.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright 2012 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redis
  15. import (
  16. "errors"
  17. )
  18. // Subscription represents a subscribe or unsubscribe notification.
  19. type Subscription struct {
  20. // Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
  21. Kind string
  22. // The channel that was changed.
  23. Channel string
  24. // The current number of subscriptions for connection.
  25. Count int
  26. }
  27. // Message represents a message notification.
  28. type Message struct {
  29. // The originating channel.
  30. Channel string
  31. // The message data.
  32. Data []byte
  33. }
  34. // PMessage represents a pmessage notification.
  35. type PMessage struct {
  36. // The matched pattern.
  37. Pattern string
  38. // The originating channel.
  39. Channel string
  40. // The message data.
  41. Data []byte
  42. }
  43. // PubSubConn wraps a Conn with convenience methods for subscribers.
  44. type PubSubConn struct {
  45. Conn Conn
  46. }
  47. // Close closes the connection.
  48. func (c PubSubConn) Close() error {
  49. return c.Conn.Close()
  50. }
  51. // Subscribe subscribes the connection to the specified channels.
  52. func (c PubSubConn) Subscribe(channel ...interface{}) error {
  53. c.Conn.Send("SUBSCRIBE", channel...)
  54. return c.Conn.Flush()
  55. }
  56. // PSubscribe subscribes the connection to the given patterns.
  57. func (c PubSubConn) PSubscribe(channel ...interface{}) error {
  58. c.Conn.Send("PSUBSCRIBE", channel...)
  59. return c.Conn.Flush()
  60. }
  61. // Unsubscribe unsubscribes the connection from the given channels, or from all
  62. // of them if none is given.
  63. func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
  64. c.Conn.Send("UNSUBSCRIBE", channel...)
  65. return c.Conn.Flush()
  66. }
  67. // PUnsubscribe unsubscribes the connection from the given patterns, or from all
  68. // of them if none is given.
  69. func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
  70. c.Conn.Send("PUNSUBSCRIBE", channel...)
  71. return c.Conn.Flush()
  72. }
  73. // Receive returns a pushed message as a Subscription, Message, PMessage or
  74. // error. The return value is intended to be used directly in a type switch as
  75. // illustrated in the PubSubConn example.
  76. func (c PubSubConn) Receive() interface{} {
  77. reply, err := Values(c.Conn.Receive())
  78. if err != nil {
  79. return err
  80. }
  81. var kind string
  82. reply, err = Scan(reply, &kind)
  83. if err != nil {
  84. return err
  85. }
  86. switch kind {
  87. case "message":
  88. var m Message
  89. if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
  90. return err
  91. }
  92. return m
  93. case "pmessage":
  94. var pm PMessage
  95. if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil {
  96. return err
  97. }
  98. return pm
  99. case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
  100. s := Subscription{Kind: kind}
  101. if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
  102. return err
  103. }
  104. return s
  105. }
  106. return errors.New("redigo: unknown pubsub notification")
  107. }