pool_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. // Copyright 2011 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redis_test
  15. import (
  16. "errors"
  17. "io"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/gomodule/redigo/redis"
  23. )
  24. type poolTestConn struct {
  25. d *poolDialer
  26. err error
  27. redis.Conn
  28. }
  29. func (c *poolTestConn) Close() error {
  30. c.d.mu.Lock()
  31. c.d.open -= 1
  32. c.d.mu.Unlock()
  33. return c.Conn.Close()
  34. }
  35. func (c *poolTestConn) Err() error { return c.err }
  36. func (c *poolTestConn) Do(commandName string, args ...interface{}) (interface{}, error) {
  37. if commandName == "ERR" {
  38. c.err = args[0].(error)
  39. commandName = "PING"
  40. }
  41. if commandName != "" {
  42. c.d.commands = append(c.d.commands, commandName)
  43. }
  44. return c.Conn.Do(commandName, args...)
  45. }
  46. func (c *poolTestConn) Send(commandName string, args ...interface{}) error {
  47. c.d.commands = append(c.d.commands, commandName)
  48. return c.Conn.Send(commandName, args...)
  49. }
  50. type poolDialer struct {
  51. mu sync.Mutex
  52. t *testing.T
  53. dialed int
  54. open int
  55. commands []string
  56. dialErr error
  57. }
  58. func (d *poolDialer) dial() (redis.Conn, error) {
  59. d.mu.Lock()
  60. d.dialed += 1
  61. dialErr := d.dialErr
  62. d.mu.Unlock()
  63. if dialErr != nil {
  64. return nil, d.dialErr
  65. }
  66. c, err := redis.DialDefaultServer()
  67. if err != nil {
  68. return nil, err
  69. }
  70. d.mu.Lock()
  71. d.open += 1
  72. d.mu.Unlock()
  73. return &poolTestConn{d: d, Conn: c}, nil
  74. }
  75. func (d *poolDialer) check(message string, p *redis.Pool, dialed, open, inuse int) {
  76. d.mu.Lock()
  77. if d.dialed != dialed {
  78. d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed)
  79. }
  80. if d.open != open {
  81. d.t.Errorf("%s: open=%d, want %d", message, d.open, open)
  82. }
  83. stats := p.Stats()
  84. if stats.ActiveCount != open {
  85. d.t.Errorf("%s: active=%d, want %d", message, stats.ActiveCount, open)
  86. }
  87. if stats.IdleCount != open-inuse {
  88. d.t.Errorf("%s: idle=%d, want %d", message, stats.IdleCount, open-inuse)
  89. }
  90. d.mu.Unlock()
  91. }
  92. func TestPoolReuse(t *testing.T) {
  93. d := poolDialer{t: t}
  94. p := &redis.Pool{
  95. MaxIdle: 2,
  96. Dial: d.dial,
  97. }
  98. for i := 0; i < 10; i++ {
  99. c1 := p.Get()
  100. c1.Do("PING")
  101. c2 := p.Get()
  102. c2.Do("PING")
  103. c1.Close()
  104. c2.Close()
  105. }
  106. d.check("before close", p, 2, 2, 0)
  107. p.Close()
  108. d.check("after close", p, 2, 0, 0)
  109. }
  110. func TestPoolMaxIdle(t *testing.T) {
  111. d := poolDialer{t: t}
  112. p := &redis.Pool{
  113. MaxIdle: 2,
  114. Dial: d.dial,
  115. }
  116. defer p.Close()
  117. for i := 0; i < 10; i++ {
  118. c1 := p.Get()
  119. c1.Do("PING")
  120. c2 := p.Get()
  121. c2.Do("PING")
  122. c3 := p.Get()
  123. c3.Do("PING")
  124. c1.Close()
  125. c2.Close()
  126. c3.Close()
  127. }
  128. d.check("before close", p, 12, 2, 0)
  129. p.Close()
  130. d.check("after close", p, 12, 0, 0)
  131. }
  132. func TestPoolError(t *testing.T) {
  133. d := poolDialer{t: t}
  134. p := &redis.Pool{
  135. MaxIdle: 2,
  136. Dial: d.dial,
  137. }
  138. defer p.Close()
  139. c := p.Get()
  140. c.Do("ERR", io.EOF)
  141. if c.Err() == nil {
  142. t.Errorf("expected c.Err() != nil")
  143. }
  144. c.Close()
  145. c = p.Get()
  146. c.Do("ERR", io.EOF)
  147. c.Close()
  148. d.check(".", p, 2, 0, 0)
  149. }
  150. func TestPoolClose(t *testing.T) {
  151. d := poolDialer{t: t}
  152. p := &redis.Pool{
  153. MaxIdle: 2,
  154. Dial: d.dial,
  155. }
  156. defer p.Close()
  157. c1 := p.Get()
  158. c1.Do("PING")
  159. c2 := p.Get()
  160. c2.Do("PING")
  161. c3 := p.Get()
  162. c3.Do("PING")
  163. c1.Close()
  164. if _, err := c1.Do("PING"); err == nil {
  165. t.Errorf("expected error after connection closed")
  166. }
  167. c2.Close()
  168. c2.Close()
  169. p.Close()
  170. d.check("after pool close", p, 3, 1, 1)
  171. if _, err := c1.Do("PING"); err == nil {
  172. t.Errorf("expected error after connection and pool closed")
  173. }
  174. c3.Close()
  175. d.check("after conn close", p, 3, 0, 0)
  176. c1 = p.Get()
  177. if _, err := c1.Do("PING"); err == nil {
  178. t.Errorf("expected error after pool closed")
  179. }
  180. }
  181. func TestPoolClosedConn(t *testing.T) {
  182. d := poolDialer{t: t}
  183. p := &redis.Pool{
  184. MaxIdle: 2,
  185. IdleTimeout: 300 * time.Second,
  186. Dial: d.dial,
  187. }
  188. defer p.Close()
  189. c := p.Get()
  190. if c.Err() != nil {
  191. t.Fatal("get failed")
  192. }
  193. c.Close()
  194. if err := c.Err(); err == nil {
  195. t.Fatal("Err on closed connection did not return error")
  196. }
  197. if _, err := c.Do("PING"); err == nil {
  198. t.Fatal("Do on closed connection did not return error")
  199. }
  200. if err := c.Send("PING"); err == nil {
  201. t.Fatal("Send on closed connection did not return error")
  202. }
  203. if err := c.Flush(); err == nil {
  204. t.Fatal("Flush on closed connection did not return error")
  205. }
  206. if _, err := c.Receive(); err == nil {
  207. t.Fatal("Receive on closed connection did not return error")
  208. }
  209. }
  210. func TestPoolIdleTimeout(t *testing.T) {
  211. d := poolDialer{t: t}
  212. p := &redis.Pool{
  213. MaxIdle: 2,
  214. IdleTimeout: 300 * time.Second,
  215. Dial: d.dial,
  216. }
  217. defer p.Close()
  218. now := time.Now()
  219. redis.SetNowFunc(func() time.Time { return now })
  220. defer redis.SetNowFunc(time.Now)
  221. c := p.Get()
  222. c.Do("PING")
  223. c.Close()
  224. d.check("1", p, 1, 1, 0)
  225. now = now.Add(p.IdleTimeout + 1)
  226. c = p.Get()
  227. c.Do("PING")
  228. c.Close()
  229. d.check("2", p, 2, 1, 0)
  230. }
  231. func TestPoolMaxLifetime(t *testing.T) {
  232. d := poolDialer{t: t}
  233. p := &redis.Pool{
  234. MaxIdle: 2,
  235. MaxConnLifetime: 300 * time.Second,
  236. Dial: d.dial,
  237. }
  238. defer p.Close()
  239. now := time.Now()
  240. redis.SetNowFunc(func() time.Time { return now })
  241. defer redis.SetNowFunc(time.Now)
  242. c := p.Get()
  243. c.Do("PING")
  244. c.Close()
  245. d.check("1", p, 1, 1, 0)
  246. now = now.Add(p.MaxConnLifetime + 1)
  247. c = p.Get()
  248. c.Do("PING")
  249. c.Close()
  250. d.check("2", p, 2, 1, 0)
  251. }
  252. func TestPoolConcurrenSendReceive(t *testing.T) {
  253. p := &redis.Pool{
  254. Dial: redis.DialDefaultServer,
  255. }
  256. defer p.Close()
  257. c := p.Get()
  258. done := make(chan error, 1)
  259. go func() {
  260. _, err := c.Receive()
  261. done <- err
  262. }()
  263. c.Send("PING")
  264. c.Flush()
  265. err := <-done
  266. if err != nil {
  267. t.Fatalf("Receive() returned error %v", err)
  268. }
  269. _, err = c.Do("")
  270. if err != nil {
  271. t.Fatalf("Do() returned error %v", err)
  272. }
  273. c.Close()
  274. }
  275. func TestPoolBorrowCheck(t *testing.T) {
  276. d := poolDialer{t: t}
  277. p := &redis.Pool{
  278. MaxIdle: 2,
  279. Dial: d.dial,
  280. TestOnBorrow: func(redis.Conn, time.Time) error { return redis.Error("BLAH") },
  281. }
  282. defer p.Close()
  283. for i := 0; i < 10; i++ {
  284. c := p.Get()
  285. c.Do("PING")
  286. c.Close()
  287. }
  288. d.check("1", p, 10, 1, 0)
  289. }
  290. func TestPoolMaxActive(t *testing.T) {
  291. d := poolDialer{t: t}
  292. p := &redis.Pool{
  293. MaxIdle: 2,
  294. MaxActive: 2,
  295. Dial: d.dial,
  296. }
  297. defer p.Close()
  298. c1 := p.Get()
  299. c1.Do("PING")
  300. c2 := p.Get()
  301. c2.Do("PING")
  302. d.check("1", p, 2, 2, 2)
  303. c3 := p.Get()
  304. if _, err := c3.Do("PING"); err != redis.ErrPoolExhausted {
  305. t.Errorf("expected pool exhausted")
  306. }
  307. c3.Close()
  308. d.check("2", p, 2, 2, 2)
  309. c2.Close()
  310. d.check("3", p, 2, 2, 1)
  311. c3 = p.Get()
  312. if _, err := c3.Do("PING"); err != nil {
  313. t.Errorf("expected good channel, err=%v", err)
  314. }
  315. c3.Close()
  316. d.check("4", p, 2, 2, 1)
  317. }
  318. func TestPoolMonitorCleanup(t *testing.T) {
  319. d := poolDialer{t: t}
  320. p := &redis.Pool{
  321. MaxIdle: 2,
  322. MaxActive: 2,
  323. Dial: d.dial,
  324. }
  325. defer p.Close()
  326. c := p.Get()
  327. c.Send("MONITOR")
  328. c.Close()
  329. d.check("", p, 1, 0, 0)
  330. }
  331. func TestPoolPubSubCleanup(t *testing.T) {
  332. d := poolDialer{t: t}
  333. p := &redis.Pool{
  334. MaxIdle: 2,
  335. MaxActive: 2,
  336. Dial: d.dial,
  337. }
  338. defer p.Close()
  339. c := p.Get()
  340. c.Send("SUBSCRIBE", "x")
  341. c.Close()
  342. want := []string{"SUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"}
  343. if !reflect.DeepEqual(d.commands, want) {
  344. t.Errorf("got commands %v, want %v", d.commands, want)
  345. }
  346. d.commands = nil
  347. c = p.Get()
  348. c.Send("PSUBSCRIBE", "x*")
  349. c.Close()
  350. want = []string{"PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"}
  351. if !reflect.DeepEqual(d.commands, want) {
  352. t.Errorf("got commands %v, want %v", d.commands, want)
  353. }
  354. d.commands = nil
  355. }
  356. func TestPoolTransactionCleanup(t *testing.T) {
  357. d := poolDialer{t: t}
  358. p := &redis.Pool{
  359. MaxIdle: 2,
  360. MaxActive: 2,
  361. Dial: d.dial,
  362. }
  363. defer p.Close()
  364. c := p.Get()
  365. c.Do("WATCH", "key")
  366. c.Do("PING")
  367. c.Close()
  368. want := []string{"WATCH", "PING", "UNWATCH"}
  369. if !reflect.DeepEqual(d.commands, want) {
  370. t.Errorf("got commands %v, want %v", d.commands, want)
  371. }
  372. d.commands = nil
  373. c = p.Get()
  374. c.Do("WATCH", "key")
  375. c.Do("UNWATCH")
  376. c.Do("PING")
  377. c.Close()
  378. want = []string{"WATCH", "UNWATCH", "PING"}
  379. if !reflect.DeepEqual(d.commands, want) {
  380. t.Errorf("got commands %v, want %v", d.commands, want)
  381. }
  382. d.commands = nil
  383. c = p.Get()
  384. c.Do("WATCH", "key")
  385. c.Do("MULTI")
  386. c.Do("PING")
  387. c.Close()
  388. want = []string{"WATCH", "MULTI", "PING", "DISCARD"}
  389. if !reflect.DeepEqual(d.commands, want) {
  390. t.Errorf("got commands %v, want %v", d.commands, want)
  391. }
  392. d.commands = nil
  393. c = p.Get()
  394. c.Do("WATCH", "key")
  395. c.Do("MULTI")
  396. c.Do("DISCARD")
  397. c.Do("PING")
  398. c.Close()
  399. want = []string{"WATCH", "MULTI", "DISCARD", "PING"}
  400. if !reflect.DeepEqual(d.commands, want) {
  401. t.Errorf("got commands %v, want %v", d.commands, want)
  402. }
  403. d.commands = nil
  404. c = p.Get()
  405. c.Do("WATCH", "key")
  406. c.Do("MULTI")
  407. c.Do("EXEC")
  408. c.Do("PING")
  409. c.Close()
  410. want = []string{"WATCH", "MULTI", "EXEC", "PING"}
  411. if !reflect.DeepEqual(d.commands, want) {
  412. t.Errorf("got commands %v, want %v", d.commands, want)
  413. }
  414. d.commands = nil
  415. }
  416. func startGoroutines(p *redis.Pool, cmd string, args ...interface{}) chan error {
  417. errs := make(chan error, 10)
  418. for i := 0; i < cap(errs); i++ {
  419. go func() {
  420. c := p.Get()
  421. _, err := c.Do(cmd, args...)
  422. c.Close()
  423. errs <- err
  424. }()
  425. }
  426. return errs
  427. }
  428. func TestWaitPool(t *testing.T) {
  429. d := poolDialer{t: t}
  430. p := &redis.Pool{
  431. MaxIdle: 1,
  432. MaxActive: 1,
  433. Dial: d.dial,
  434. Wait: true,
  435. }
  436. defer p.Close()
  437. c := p.Get()
  438. errs := startGoroutines(p, "PING")
  439. d.check("before close", p, 1, 1, 1)
  440. c.Close()
  441. timeout := time.After(2 * time.Second)
  442. for i := 0; i < cap(errs); i++ {
  443. select {
  444. case err := <-errs:
  445. if err != nil {
  446. t.Fatal(err)
  447. }
  448. case <-timeout:
  449. t.Fatalf("timeout waiting for blocked goroutine %d", i)
  450. }
  451. }
  452. d.check("done", p, 1, 1, 0)
  453. }
  454. func TestWaitPoolClose(t *testing.T) {
  455. d := poolDialer{t: t}
  456. p := &redis.Pool{
  457. MaxIdle: 1,
  458. MaxActive: 1,
  459. Dial: d.dial,
  460. Wait: true,
  461. }
  462. defer p.Close()
  463. c := p.Get()
  464. if _, err := c.Do("PING"); err != nil {
  465. t.Fatal(err)
  466. }
  467. errs := startGoroutines(p, "PING")
  468. d.check("before close", p, 1, 1, 1)
  469. p.Close()
  470. timeout := time.After(2 * time.Second)
  471. for i := 0; i < cap(errs); i++ {
  472. select {
  473. case err := <-errs:
  474. switch err {
  475. case nil:
  476. t.Fatal("blocked goroutine did not get error")
  477. case redis.ErrPoolExhausted:
  478. t.Fatal("blocked goroutine got pool exhausted error")
  479. }
  480. case <-timeout:
  481. t.Fatal("timeout waiting for blocked goroutine")
  482. }
  483. }
  484. c.Close()
  485. d.check("done", p, 1, 0, 0)
  486. }
  487. func TestWaitPoolCommandError(t *testing.T) {
  488. testErr := errors.New("test")
  489. d := poolDialer{t: t}
  490. p := &redis.Pool{
  491. MaxIdle: 1,
  492. MaxActive: 1,
  493. Dial: d.dial,
  494. Wait: true,
  495. }
  496. defer p.Close()
  497. c := p.Get()
  498. errs := startGoroutines(p, "ERR", testErr)
  499. d.check("before close", p, 1, 1, 1)
  500. c.Close()
  501. timeout := time.After(2 * time.Second)
  502. for i := 0; i < cap(errs); i++ {
  503. select {
  504. case err := <-errs:
  505. if err != nil {
  506. t.Fatal(err)
  507. }
  508. case <-timeout:
  509. t.Fatalf("timeout waiting for blocked goroutine %d", i)
  510. }
  511. }
  512. d.check("done", p, cap(errs), 0, 0)
  513. }
  514. func TestWaitPoolDialError(t *testing.T) {
  515. testErr := errors.New("test")
  516. d := poolDialer{t: t}
  517. p := &redis.Pool{
  518. MaxIdle: 1,
  519. MaxActive: 1,
  520. Dial: d.dial,
  521. Wait: true,
  522. }
  523. defer p.Close()
  524. c := p.Get()
  525. errs := startGoroutines(p, "ERR", testErr)
  526. d.check("before close", p, 1, 1, 1)
  527. d.dialErr = errors.New("dial")
  528. c.Close()
  529. nilCount := 0
  530. errCount := 0
  531. timeout := time.After(2 * time.Second)
  532. for i := 0; i < cap(errs); i++ {
  533. select {
  534. case err := <-errs:
  535. switch err {
  536. case nil:
  537. nilCount++
  538. case d.dialErr:
  539. errCount++
  540. default:
  541. t.Fatalf("expected dial error or nil, got %v", err)
  542. }
  543. case <-timeout:
  544. t.Fatalf("timeout waiting for blocked goroutine %d", i)
  545. }
  546. }
  547. if nilCount != 1 {
  548. t.Errorf("expected one nil error, got %d", nilCount)
  549. }
  550. if errCount != cap(errs)-1 {
  551. t.Errorf("expected %d dial errors, got %d", cap(errs)-1, errCount)
  552. }
  553. d.check("done", p, cap(errs), 0, 0)
  554. }
  555. // Borrowing requires us to iterate over the idle connections, unlock the pool,
  556. // and perform a blocking operation to check the connection still works. If
  557. // TestOnBorrow fails, we must reacquire the lock and continue iteration. This
  558. // test ensures that iteration will work correctly if multiple threads are
  559. // iterating simultaneously.
  560. func TestLocking_TestOnBorrowFails_PoolDoesntCrash(t *testing.T) {
  561. const count = 100
  562. // First we'll Create a pool where the pilfering of idle connections fails.
  563. d := poolDialer{t: t}
  564. p := &redis.Pool{
  565. MaxIdle: count,
  566. MaxActive: count,
  567. Dial: d.dial,
  568. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  569. return errors.New("No way back into the real world.")
  570. },
  571. }
  572. defer p.Close()
  573. // Fill the pool with idle connections.
  574. conns := make([]redis.Conn, count)
  575. for i := range conns {
  576. conns[i] = p.Get()
  577. }
  578. for i := range conns {
  579. conns[i].Close()
  580. }
  581. // Spawn a bunch of goroutines to thrash the pool.
  582. var wg sync.WaitGroup
  583. wg.Add(count)
  584. for i := 0; i < count; i++ {
  585. go func() {
  586. c := p.Get()
  587. if c.Err() != nil {
  588. t.Errorf("pool get failed: %v", c.Err())
  589. }
  590. c.Close()
  591. wg.Done()
  592. }()
  593. }
  594. wg.Wait()
  595. if d.dialed != count*2 {
  596. t.Errorf("Expected %d dials, got %d", count*2, d.dialed)
  597. }
  598. }
  599. func BenchmarkPoolGet(b *testing.B) {
  600. b.StopTimer()
  601. p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2}
  602. c := p.Get()
  603. if err := c.Err(); err != nil {
  604. b.Fatal(err)
  605. }
  606. c.Close()
  607. defer p.Close()
  608. b.StartTimer()
  609. for i := 0; i < b.N; i++ {
  610. c = p.Get()
  611. c.Close()
  612. }
  613. }
  614. func BenchmarkPoolGetErr(b *testing.B) {
  615. b.StopTimer()
  616. p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2}
  617. c := p.Get()
  618. if err := c.Err(); err != nil {
  619. b.Fatal(err)
  620. }
  621. c.Close()
  622. defer p.Close()
  623. b.StartTimer()
  624. for i := 0; i < b.N; i++ {
  625. c = p.Get()
  626. if err := c.Err(); err != nil {
  627. b.Fatal(err)
  628. }
  629. c.Close()
  630. }
  631. }
  632. func BenchmarkPoolGetPing(b *testing.B) {
  633. b.StopTimer()
  634. p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2}
  635. c := p.Get()
  636. if err := c.Err(); err != nil {
  637. b.Fatal(err)
  638. }
  639. c.Close()
  640. defer p.Close()
  641. b.StartTimer()
  642. for i := 0; i < b.N; i++ {
  643. c = p.Get()
  644. if _, err := c.Do("PING"); err != nil {
  645. b.Fatal(err)
  646. }
  647. c.Close()
  648. }
  649. }