pubsub_example_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // Copyright 2012 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. // +build go1.7
  15. package redis_test
  16. import (
  17. "context"
  18. "fmt"
  19. "time"
  20. "github.com/gomodule/redigo/redis"
  21. )
  22. // listenPubSubChannels listens for messages on Redis pubsub channels. The
  23. // onStart function is called after the channels are subscribed. The onMessage
  24. // function is called for each message.
  25. func listenPubSubChannels(ctx context.Context, redisServerAddr string,
  26. onStart func() error,
  27. onMessage func(channel string, data []byte) error,
  28. channels ...string) error {
  29. // A ping is set to the server with this period to test for the health of
  30. // the connection and server.
  31. const healthCheckPeriod = time.Minute
  32. c, err := redis.Dial("tcp", redisServerAddr,
  33. // Read timeout on server should be greater than ping period.
  34. redis.DialReadTimeout(healthCheckPeriod+10*time.Second),
  35. redis.DialWriteTimeout(10*time.Second))
  36. if err != nil {
  37. return err
  38. }
  39. defer c.Close()
  40. psc := redis.PubSubConn{Conn: c}
  41. if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
  42. return err
  43. }
  44. done := make(chan error, 1)
  45. // Start a goroutine to receive notifications from the server.
  46. go func() {
  47. for {
  48. switch n := psc.Receive().(type) {
  49. case error:
  50. done <- n
  51. return
  52. case redis.Message:
  53. if err := onMessage(n.Channel, n.Data); err != nil {
  54. done <- err
  55. return
  56. }
  57. case redis.Subscription:
  58. switch n.Count {
  59. case len(channels):
  60. // Notify application when all channels are subscribed.
  61. if err := onStart(); err != nil {
  62. done <- err
  63. return
  64. }
  65. case 0:
  66. // Return from the goroutine when all channels are unsubscribed.
  67. done <- nil
  68. return
  69. }
  70. }
  71. }
  72. }()
  73. ticker := time.NewTicker(healthCheckPeriod)
  74. defer ticker.Stop()
  75. loop:
  76. for err == nil {
  77. select {
  78. case <-ticker.C:
  79. // Send ping to test health of connection and server. If
  80. // corresponding pong is not received, then receive on the
  81. // connection will timeout and the receive goroutine will exit.
  82. if err = psc.Ping(""); err != nil {
  83. break loop
  84. }
  85. case <-ctx.Done():
  86. break loop
  87. case err := <-done:
  88. // Return error from the receive goroutine.
  89. return err
  90. }
  91. }
  92. // Signal the receiving goroutine to exit by unsubscribing from all channels.
  93. psc.Unsubscribe()
  94. // Wait for goroutine to complete.
  95. return <-done
  96. }
  97. func publish() {
  98. c, err := dial()
  99. if err != nil {
  100. fmt.Println(err)
  101. return
  102. }
  103. defer c.Close()
  104. c.Do("PUBLISH", "c1", "hello")
  105. c.Do("PUBLISH", "c2", "world")
  106. c.Do("PUBLISH", "c1", "goodbye")
  107. }
  108. // This example shows how receive pubsub notifications with cancelation and
  109. // health checks.
  110. func ExamplePubSubConn() {
  111. redisServerAddr, err := serverAddr()
  112. if err != nil {
  113. fmt.Println(err)
  114. return
  115. }
  116. ctx, cancel := context.WithCancel(context.Background())
  117. err = listenPubSubChannels(ctx,
  118. redisServerAddr,
  119. func() error {
  120. // The start callback is a good place to backfill missed
  121. // notifications. For the purpose of this example, a goroutine is
  122. // started to send notifications.
  123. go publish()
  124. return nil
  125. },
  126. func(channel string, message []byte) error {
  127. fmt.Printf("channel: %s, message: %s\n", channel, message)
  128. // For the purpose of this example, cancel the listener's context
  129. // after receiving last message sent by publish().
  130. if string(message) == "goodbye" {
  131. cancel()
  132. }
  133. return nil
  134. },
  135. "c1", "c2")
  136. if err != nil {
  137. fmt.Println(err)
  138. return
  139. }
  140. // Output:
  141. // channel: c1, message: hello
  142. // channel: c2, message: world
  143. // channel: c1, message: goodbye
  144. }