redis_test.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package redis_test
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "net"
  7. "testing"
  8. "time"
  9. "github.com/go-redis/redis/v7"
  10. . "github.com/onsi/ginkgo"
  11. . "github.com/onsi/gomega"
  12. )
  13. type redisHookError struct {
  14. redis.Hook
  15. }
  16. var _ redis.Hook = redisHookError{}
  17. func (redisHookError) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
  18. return ctx, nil
  19. }
  20. func (redisHookError) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
  21. return errors.New("hook error")
  22. }
  23. func TestHookError(t *testing.T) {
  24. rdb := redis.NewClient(&redis.Options{
  25. Addr: ":6379",
  26. })
  27. rdb.AddHook(redisHookError{})
  28. err := rdb.Ping().Err()
  29. if err == nil {
  30. t.Fatalf("got nil, expected an error")
  31. }
  32. wanted := "hook error"
  33. if err.Error() != wanted {
  34. t.Fatalf(`got %q, wanted %q`, err, wanted)
  35. }
  36. }
  37. //------------------------------------------------------------------------------
  38. var _ = Describe("Client", func() {
  39. var client *redis.Client
  40. BeforeEach(func() {
  41. client = redis.NewClient(redisOptions())
  42. Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
  43. })
  44. AfterEach(func() {
  45. client.Close()
  46. })
  47. It("should Stringer", func() {
  48. Expect(client.String()).To(Equal("Redis<:6380 db:15>"))
  49. })
  50. It("supports WithContext", func() {
  51. c, cancel := context.WithCancel(context.Background())
  52. cancel()
  53. err := client.WithContext(c).Ping().Err()
  54. Expect(err).To(MatchError("context canceled"))
  55. })
  56. It("supports WithTimeout", func() {
  57. err := client.ClientPause(time.Second).Err()
  58. Expect(err).NotTo(HaveOccurred())
  59. err = client.WithTimeout(10 * time.Millisecond).Ping().Err()
  60. Expect(err).To(HaveOccurred())
  61. err = client.Ping().Err()
  62. Expect(err).NotTo(HaveOccurred())
  63. })
  64. It("should ping", func() {
  65. val, err := client.Ping().Result()
  66. Expect(err).NotTo(HaveOccurred())
  67. Expect(val).To(Equal("PONG"))
  68. })
  69. It("should return pool stats", func() {
  70. Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
  71. })
  72. It("should support custom dialers", func() {
  73. custom := redis.NewClient(&redis.Options{
  74. Network: "tcp",
  75. Addr: redisAddr,
  76. Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
  77. var d net.Dialer
  78. return d.DialContext(ctx, network, addr)
  79. },
  80. })
  81. val, err := custom.Ping().Result()
  82. Expect(err).NotTo(HaveOccurred())
  83. Expect(val).To(Equal("PONG"))
  84. Expect(custom.Close()).NotTo(HaveOccurred())
  85. })
  86. It("should close", func() {
  87. Expect(client.Close()).NotTo(HaveOccurred())
  88. err := client.Ping().Err()
  89. Expect(err).To(MatchError("redis: client is closed"))
  90. })
  91. It("should close pubsub without closing the client", func() {
  92. pubsub := client.Subscribe()
  93. Expect(pubsub.Close()).NotTo(HaveOccurred())
  94. _, err := pubsub.Receive()
  95. Expect(err).To(MatchError("redis: client is closed"))
  96. Expect(client.Ping().Err()).NotTo(HaveOccurred())
  97. })
  98. It("should close Tx without closing the client", func() {
  99. err := client.Watch(func(tx *redis.Tx) error {
  100. _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  101. pipe.Ping()
  102. return nil
  103. })
  104. return err
  105. })
  106. Expect(err).NotTo(HaveOccurred())
  107. Expect(client.Ping().Err()).NotTo(HaveOccurred())
  108. })
  109. It("should close pipeline without closing the client", func() {
  110. pipeline := client.Pipeline()
  111. Expect(pipeline.Close()).NotTo(HaveOccurred())
  112. pipeline.Ping()
  113. _, err := pipeline.Exec()
  114. Expect(err).To(MatchError("redis: client is closed"))
  115. Expect(client.Ping().Err()).NotTo(HaveOccurred())
  116. })
  117. It("should close pubsub when client is closed", func() {
  118. pubsub := client.Subscribe()
  119. Expect(client.Close()).NotTo(HaveOccurred())
  120. _, err := pubsub.Receive()
  121. Expect(err).To(MatchError("redis: client is closed"))
  122. Expect(pubsub.Close()).NotTo(HaveOccurred())
  123. })
  124. It("should close pipeline when client is closed", func() {
  125. pipeline := client.Pipeline()
  126. Expect(client.Close()).NotTo(HaveOccurred())
  127. Expect(pipeline.Close()).NotTo(HaveOccurred())
  128. })
  129. It("should select DB", func() {
  130. db2 := redis.NewClient(&redis.Options{
  131. Addr: redisAddr,
  132. DB: 2,
  133. })
  134. Expect(db2.FlushDB().Err()).NotTo(HaveOccurred())
  135. Expect(db2.Get("db").Err()).To(Equal(redis.Nil))
  136. Expect(db2.Set("db", 2, 0).Err()).NotTo(HaveOccurred())
  137. n, err := db2.Get("db").Int64()
  138. Expect(err).NotTo(HaveOccurred())
  139. Expect(n).To(Equal(int64(2)))
  140. Expect(client.Get("db").Err()).To(Equal(redis.Nil))
  141. Expect(db2.FlushDB().Err()).NotTo(HaveOccurred())
  142. Expect(db2.Close()).NotTo(HaveOccurred())
  143. })
  144. It("processes custom commands", func() {
  145. cmd := redis.NewCmd("PING")
  146. _ = client.Process(cmd)
  147. // Flush buffers.
  148. Expect(client.Echo("hello").Err()).NotTo(HaveOccurred())
  149. Expect(cmd.Err()).NotTo(HaveOccurred())
  150. Expect(cmd.Val()).To(Equal("PONG"))
  151. })
  152. It("should retry command on network error", func() {
  153. Expect(client.Close()).NotTo(HaveOccurred())
  154. client = redis.NewClient(&redis.Options{
  155. Addr: redisAddr,
  156. MaxRetries: 1,
  157. })
  158. // Put bad connection in the pool.
  159. cn, err := client.Pool().Get(context.Background())
  160. Expect(err).NotTo(HaveOccurred())
  161. cn.SetNetConn(&badConn{})
  162. client.Pool().Put(cn)
  163. err = client.Ping().Err()
  164. Expect(err).NotTo(HaveOccurred())
  165. })
  166. It("should retry with backoff", func() {
  167. clientNoRetry := redis.NewClient(&redis.Options{
  168. Addr: ":1234",
  169. MaxRetries: 0,
  170. })
  171. defer clientNoRetry.Close()
  172. clientRetry := redis.NewClient(&redis.Options{
  173. Addr: ":1234",
  174. MaxRetries: 5,
  175. MaxRetryBackoff: 128 * time.Millisecond,
  176. })
  177. defer clientRetry.Close()
  178. startNoRetry := time.Now()
  179. err := clientNoRetry.Ping().Err()
  180. Expect(err).To(HaveOccurred())
  181. elapseNoRetry := time.Since(startNoRetry)
  182. startRetry := time.Now()
  183. err = clientRetry.Ping().Err()
  184. Expect(err).To(HaveOccurred())
  185. elapseRetry := time.Since(startRetry)
  186. Expect(elapseRetry).To(BeNumerically(">", elapseNoRetry, 10*time.Millisecond))
  187. })
  188. It("should update conn.UsedAt on read/write", func() {
  189. cn, err := client.Pool().Get(context.Background())
  190. Expect(err).NotTo(HaveOccurred())
  191. Expect(cn.UsedAt).NotTo(BeZero())
  192. createdAt := cn.UsedAt()
  193. client.Pool().Put(cn)
  194. Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
  195. time.Sleep(time.Second)
  196. err = client.Ping().Err()
  197. Expect(err).NotTo(HaveOccurred())
  198. cn, err = client.Pool().Get(context.Background())
  199. Expect(err).NotTo(HaveOccurred())
  200. Expect(cn).NotTo(BeNil())
  201. Expect(cn.UsedAt().After(createdAt)).To(BeTrue())
  202. })
  203. It("should process command with special chars", func() {
  204. set := client.Set("key", "hello1\r\nhello2\r\n", 0)
  205. Expect(set.Err()).NotTo(HaveOccurred())
  206. Expect(set.Val()).To(Equal("OK"))
  207. get := client.Get("key")
  208. Expect(get.Err()).NotTo(HaveOccurred())
  209. Expect(get.Val()).To(Equal("hello1\r\nhello2\r\n"))
  210. })
  211. It("should handle big vals", func() {
  212. bigVal := bytes.Repeat([]byte{'*'}, 2e6)
  213. err := client.Set("key", bigVal, 0).Err()
  214. Expect(err).NotTo(HaveOccurred())
  215. // Reconnect to get new connection.
  216. Expect(client.Close()).NotTo(HaveOccurred())
  217. client = redis.NewClient(redisOptions())
  218. got, err := client.Get("key").Bytes()
  219. Expect(err).NotTo(HaveOccurred())
  220. Expect(got).To(Equal(bigVal))
  221. })
  222. })
  223. var _ = Describe("Client timeout", func() {
  224. var opt *redis.Options
  225. var client *redis.Client
  226. AfterEach(func() {
  227. Expect(client.Close()).NotTo(HaveOccurred())
  228. })
  229. testTimeout := func() {
  230. It("Ping timeouts", func() {
  231. err := client.Ping().Err()
  232. Expect(err).To(HaveOccurred())
  233. Expect(err.(net.Error).Timeout()).To(BeTrue())
  234. })
  235. It("Pipeline timeouts", func() {
  236. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  237. pipe.Ping()
  238. return nil
  239. })
  240. Expect(err).To(HaveOccurred())
  241. Expect(err.(net.Error).Timeout()).To(BeTrue())
  242. })
  243. It("Subscribe timeouts", func() {
  244. if opt.WriteTimeout == 0 {
  245. return
  246. }
  247. pubsub := client.Subscribe()
  248. defer pubsub.Close()
  249. err := pubsub.Subscribe("_")
  250. Expect(err).To(HaveOccurred())
  251. Expect(err.(net.Error).Timeout()).To(BeTrue())
  252. })
  253. It("Tx timeouts", func() {
  254. err := client.Watch(func(tx *redis.Tx) error {
  255. return tx.Ping().Err()
  256. })
  257. Expect(err).To(HaveOccurred())
  258. Expect(err.(net.Error).Timeout()).To(BeTrue())
  259. })
  260. It("Tx Pipeline timeouts", func() {
  261. err := client.Watch(func(tx *redis.Tx) error {
  262. _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  263. pipe.Ping()
  264. return nil
  265. })
  266. return err
  267. })
  268. Expect(err).To(HaveOccurred())
  269. Expect(err.(net.Error).Timeout()).To(BeTrue())
  270. })
  271. }
  272. Context("read timeout", func() {
  273. BeforeEach(func() {
  274. opt = redisOptions()
  275. opt.ReadTimeout = time.Nanosecond
  276. opt.WriteTimeout = -1
  277. client = redis.NewClient(opt)
  278. })
  279. testTimeout()
  280. })
  281. Context("write timeout", func() {
  282. BeforeEach(func() {
  283. opt = redisOptions()
  284. opt.ReadTimeout = -1
  285. opt.WriteTimeout = time.Nanosecond
  286. client = redis.NewClient(opt)
  287. })
  288. testTimeout()
  289. })
  290. })
  291. var _ = Describe("Client OnConnect", func() {
  292. var client *redis.Client
  293. BeforeEach(func() {
  294. opt := redisOptions()
  295. opt.DB = 0
  296. opt.OnConnect = func(cn *redis.Conn) error {
  297. return cn.ClientSetName("on_connect").Err()
  298. }
  299. client = redis.NewClient(opt)
  300. })
  301. AfterEach(func() {
  302. Expect(client.Close()).NotTo(HaveOccurred())
  303. })
  304. It("calls OnConnect", func() {
  305. name, err := client.ClientGetName().Result()
  306. Expect(err).NotTo(HaveOccurred())
  307. Expect(name).To(Equal("on_connect"))
  308. })
  309. })