pool.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Copyright 2012 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redis
  15. import (
  16. "bytes"
  17. "context"
  18. "crypto/rand"
  19. "crypto/sha1"
  20. "errors"
  21. "io"
  22. "strconv"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. )
  27. var (
  28. _ ConnWithTimeout = (*activeConn)(nil)
  29. _ ConnWithTimeout = (*errorConn)(nil)
  30. )
  31. var nowFunc = time.Now // for testing
  32. // ErrPoolExhausted is returned from a pool connection method (Do, Send,
  33. // Receive, Flush, Err) when the maximum number of database connections in the
  34. // pool has been reached.
  35. var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
  36. var (
  37. errPoolClosed = errors.New("redigo: connection pool closed")
  38. errConnClosed = errors.New("redigo: connection closed")
  39. )
  40. // Pool maintains a pool of connections. The application calls the Get method
  41. // to get a connection from the pool and the connection's Close method to
  42. // return the connection's resources to the pool.
  43. //
  44. // The following example shows how to use a pool in a web application. The
  45. // application creates a pool at application startup and makes it available to
  46. // request handlers using a package level variable. The pool configuration used
  47. // here is an example, not a recommendation.
  48. //
  49. // func newPool(addr string) *redis.Pool {
  50. // return &redis.Pool{
  51. // MaxIdle: 3,
  52. // IdleTimeout: 240 * time.Second,
  53. // // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
  54. // Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
  55. // }
  56. // }
  57. //
  58. // var (
  59. // pool *redis.Pool
  60. // redisServer = flag.String("redisServer", ":6379", "")
  61. // )
  62. //
  63. // func main() {
  64. // flag.Parse()
  65. // pool = newPool(*redisServer)
  66. // ...
  67. // }
  68. //
  69. // A request handler gets a connection from the pool and closes the connection
  70. // when the handler is done:
  71. //
  72. // func serveHome(w http.ResponseWriter, r *http.Request) {
  73. // conn := pool.Get()
  74. // defer conn.Close()
  75. // ...
  76. // }
  77. //
  78. // Use the Dial function to authenticate connections with the AUTH command or
  79. // select a database with the SELECT command:
  80. //
  81. // pool := &redis.Pool{
  82. // // Other pool configuration not shown in this example.
  83. // Dial: func () (redis.Conn, error) {
  84. // c, err := redis.Dial("tcp", server)
  85. // if err != nil {
  86. // return nil, err
  87. // }
  88. // if _, err := c.Do("AUTH", password); err != nil {
  89. // c.Close()
  90. // return nil, err
  91. // }
  92. // if _, err := c.Do("SELECT", db); err != nil {
  93. // c.Close()
  94. // return nil, err
  95. // }
  96. // return c, nil
  97. // },
  98. // }
  99. //
  100. // Use the TestOnBorrow function to check the health of an idle connection
  101. // before the connection is returned to the application. This example PINGs
  102. // connections that have been idle more than a minute:
  103. //
  104. // pool := &redis.Pool{
  105. // // Other pool configuration not shown in this example.
  106. // TestOnBorrow: func(c redis.Conn, t time.Time) error {
  107. // if time.Since(t) < time.Minute {
  108. // return nil
  109. // }
  110. // _, err := c.Do("PING")
  111. // return err
  112. // },
  113. // }
  114. //
  115. type Pool struct {
  116. // Dial is an application supplied function for creating and configuring a
  117. // connection.
  118. //
  119. // The connection returned from Dial must not be in a special state
  120. // (subscribed to pubsub channel, transaction started, ...).
  121. Dial func() (Conn, error)
  122. // DialContext is an application supplied function for creating and configuring a
  123. // connection with the given context.
  124. //
  125. // The connection returned from Dial must not be in a special state
  126. // (subscribed to pubsub channel, transaction started, ...).
  127. DialContext func(ctx context.Context) (Conn, error)
  128. // TestOnBorrow is an optional application supplied function for checking
  129. // the health of an idle connection before the connection is used again by
  130. // the application. Argument t is the time that the connection was returned
  131. // to the pool. If the function returns an error, then the connection is
  132. // closed.
  133. TestOnBorrow func(c Conn, t time.Time) error
  134. // Maximum number of idle connections in the pool.
  135. MaxIdle int
  136. // Maximum number of connections allocated by the pool at a given time.
  137. // When zero, there is no limit on the number of connections in the pool.
  138. MaxActive int
  139. // Close connections after remaining idle for this duration. If the value
  140. // is zero, then idle connections are not closed. Applications should set
  141. // the timeout to a value less than the server's timeout.
  142. IdleTimeout time.Duration
  143. // If Wait is true and the pool is at the MaxActive limit, then Get() waits
  144. // for a connection to be returned to the pool before returning.
  145. Wait bool
  146. // Close connections older than this duration. If the value is zero, then
  147. // the pool does not close connections based on age.
  148. MaxConnLifetime time.Duration
  149. chInitialized uint32 // set to 1 when field ch is initialized
  150. mu sync.Mutex // mu protects the following fields
  151. closed bool // set to true when the pool is closed.
  152. active int // the number of open connections in the pool
  153. ch chan struct{} // limits open connections when p.Wait is true
  154. idle idleList // idle connections
  155. waitCount int64 // total number of connections waited for.
  156. waitDuration time.Duration // total time waited for new connections.
  157. }
  158. // NewPool creates a new pool.
  159. //
  160. // Deprecated: Initialize the Pool directory as shown in the example.
  161. func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
  162. return &Pool{Dial: newFn, MaxIdle: maxIdle}
  163. }
  164. // Get gets a connection. The application must close the returned connection.
  165. // This method always returns a valid connection so that applications can defer
  166. // error handling to the first use of the connection. If there is an error
  167. // getting an underlying connection, then the connection Err, Do, Send, Flush
  168. // and Receive methods return that error.
  169. func (p *Pool) Get() Conn {
  170. pc, err := p.get(nil)
  171. if err != nil {
  172. return errorConn{err}
  173. }
  174. return &activeConn{p: p, pc: pc}
  175. }
  176. // GetContext gets a connection using the provided context.
  177. //
  178. // The provided Context must be non-nil. If the context expires before the
  179. // connection is complete, an error is returned. Any expiration on the context
  180. // will not affect the returned connection.
  181. //
  182. // If the function completes without error, then the application must close the
  183. // returned connection.
  184. func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
  185. pc, err := p.get(ctx)
  186. if err != nil {
  187. return errorConn{err}, err
  188. }
  189. return &activeConn{p: p, pc: pc}, nil
  190. }
  191. // PoolStats contains pool statistics.
  192. type PoolStats struct {
  193. // ActiveCount is the number of connections in the pool. The count includes
  194. // idle connections and connections in use.
  195. ActiveCount int
  196. // IdleCount is the number of idle connections in the pool.
  197. IdleCount int
  198. // WaitCount is the total number of connections waited for.
  199. // This value is currently not guaranteed to be 100% accurate.
  200. WaitCount int64
  201. // WaitDuration is the total time blocked waiting for a new connection.
  202. // This value is currently not guaranteed to be 100% accurate.
  203. WaitDuration time.Duration
  204. }
  205. // Stats returns pool's statistics.
  206. func (p *Pool) Stats() PoolStats {
  207. p.mu.Lock()
  208. stats := PoolStats{
  209. ActiveCount: p.active,
  210. IdleCount: p.idle.count,
  211. WaitCount: p.waitCount,
  212. WaitDuration: p.waitDuration,
  213. }
  214. p.mu.Unlock()
  215. return stats
  216. }
  217. // ActiveCount returns the number of connections in the pool. The count
  218. // includes idle connections and connections in use.
  219. func (p *Pool) ActiveCount() int {
  220. p.mu.Lock()
  221. active := p.active
  222. p.mu.Unlock()
  223. return active
  224. }
  225. // IdleCount returns the number of idle connections in the pool.
  226. func (p *Pool) IdleCount() int {
  227. p.mu.Lock()
  228. idle := p.idle.count
  229. p.mu.Unlock()
  230. return idle
  231. }
  232. // Close releases the resources used by the pool.
  233. func (p *Pool) Close() error {
  234. p.mu.Lock()
  235. if p.closed {
  236. p.mu.Unlock()
  237. return nil
  238. }
  239. p.closed = true
  240. p.active -= p.idle.count
  241. pc := p.idle.front
  242. p.idle.count = 0
  243. p.idle.front, p.idle.back = nil, nil
  244. if p.ch != nil {
  245. close(p.ch)
  246. }
  247. p.mu.Unlock()
  248. for ; pc != nil; pc = pc.next {
  249. pc.c.Close()
  250. }
  251. return nil
  252. }
  253. func (p *Pool) lazyInit() {
  254. // Fast path.
  255. if atomic.LoadUint32(&p.chInitialized) == 1 {
  256. return
  257. }
  258. // Slow path.
  259. p.mu.Lock()
  260. if p.chInitialized == 0 {
  261. p.ch = make(chan struct{}, p.MaxActive)
  262. if p.closed {
  263. close(p.ch)
  264. } else {
  265. for i := 0; i < p.MaxActive; i++ {
  266. p.ch <- struct{}{}
  267. }
  268. }
  269. atomic.StoreUint32(&p.chInitialized, 1)
  270. }
  271. p.mu.Unlock()
  272. }
  273. // get prunes stale connections and returns a connection from the idle list or
  274. // creates a new connection.
  275. func (p *Pool) get(ctx context.Context) (*poolConn, error) {
  276. // Handle limit for p.Wait == true.
  277. var waited time.Duration
  278. if p.Wait && p.MaxActive > 0 {
  279. p.lazyInit()
  280. // wait indicates if we believe it will block so its not 100% accurate
  281. // however for stats it should be good enough.
  282. wait := len(p.ch) == 0
  283. var start time.Time
  284. if wait {
  285. start = time.Now()
  286. }
  287. if ctx == nil {
  288. <-p.ch
  289. } else {
  290. select {
  291. case <-p.ch:
  292. case <-ctx.Done():
  293. return nil, ctx.Err()
  294. }
  295. }
  296. if wait {
  297. waited = time.Since(start)
  298. }
  299. }
  300. p.mu.Lock()
  301. if waited > 0 {
  302. p.waitCount++
  303. p.waitDuration += waited
  304. }
  305. // Prune stale connections at the back of the idle list.
  306. if p.IdleTimeout > 0 {
  307. n := p.idle.count
  308. for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
  309. pc := p.idle.back
  310. p.idle.popBack()
  311. p.mu.Unlock()
  312. pc.c.Close()
  313. p.mu.Lock()
  314. p.active--
  315. }
  316. }
  317. // Get idle connection from the front of idle list.
  318. for p.idle.front != nil {
  319. pc := p.idle.front
  320. p.idle.popFront()
  321. p.mu.Unlock()
  322. if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
  323. (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
  324. return pc, nil
  325. }
  326. pc.c.Close()
  327. p.mu.Lock()
  328. p.active--
  329. }
  330. // Check for pool closed before dialing a new connection.
  331. if p.closed {
  332. p.mu.Unlock()
  333. return nil, errors.New("redigo: get on closed pool")
  334. }
  335. // Handle limit for p.Wait == false.
  336. if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
  337. p.mu.Unlock()
  338. return nil, ErrPoolExhausted
  339. }
  340. p.active++
  341. p.mu.Unlock()
  342. c, err := p.dial(ctx)
  343. if err != nil {
  344. c = nil
  345. p.mu.Lock()
  346. p.active--
  347. if p.ch != nil && !p.closed {
  348. p.ch <- struct{}{}
  349. }
  350. p.mu.Unlock()
  351. }
  352. return &poolConn{c: c, created: nowFunc()}, err
  353. }
  354. func (p *Pool) dial(ctx context.Context) (Conn, error) {
  355. if p.DialContext != nil {
  356. return p.DialContext(ctx)
  357. }
  358. if p.Dial != nil {
  359. return p.Dial()
  360. }
  361. return nil, errors.New("redigo: must pass Dial or DialContext to pool")
  362. }
  363. func (p *Pool) put(pc *poolConn, forceClose bool) error {
  364. p.mu.Lock()
  365. if !p.closed && !forceClose {
  366. pc.t = nowFunc()
  367. p.idle.pushFront(pc)
  368. if p.idle.count > p.MaxIdle {
  369. pc = p.idle.back
  370. p.idle.popBack()
  371. } else {
  372. pc = nil
  373. }
  374. }
  375. if pc != nil {
  376. p.mu.Unlock()
  377. pc.c.Close()
  378. p.mu.Lock()
  379. p.active--
  380. }
  381. if p.ch != nil && !p.closed {
  382. p.ch <- struct{}{}
  383. }
  384. p.mu.Unlock()
  385. return nil
  386. }
  387. type activeConn struct {
  388. p *Pool
  389. pc *poolConn
  390. state int
  391. }
  392. var (
  393. sentinel []byte
  394. sentinelOnce sync.Once
  395. )
  396. func initSentinel() {
  397. p := make([]byte, 64)
  398. if _, err := rand.Read(p); err == nil {
  399. sentinel = p
  400. } else {
  401. h := sha1.New()
  402. io.WriteString(h, "Oops, rand failed. Use time instead.")
  403. io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
  404. sentinel = h.Sum(nil)
  405. }
  406. }
  407. func (ac *activeConn) Close() error {
  408. pc := ac.pc
  409. if pc == nil {
  410. return nil
  411. }
  412. ac.pc = nil
  413. if ac.state&connectionMultiState != 0 {
  414. pc.c.Send("DISCARD")
  415. ac.state &^= (connectionMultiState | connectionWatchState)
  416. } else if ac.state&connectionWatchState != 0 {
  417. pc.c.Send("UNWATCH")
  418. ac.state &^= connectionWatchState
  419. }
  420. if ac.state&connectionSubscribeState != 0 {
  421. pc.c.Send("UNSUBSCRIBE")
  422. pc.c.Send("PUNSUBSCRIBE")
  423. // To detect the end of the message stream, ask the server to echo
  424. // a sentinel value and read until we see that value.
  425. sentinelOnce.Do(initSentinel)
  426. pc.c.Send("ECHO", sentinel)
  427. pc.c.Flush()
  428. for {
  429. p, err := pc.c.Receive()
  430. if err != nil {
  431. break
  432. }
  433. if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
  434. ac.state &^= connectionSubscribeState
  435. break
  436. }
  437. }
  438. }
  439. pc.c.Do("")
  440. ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
  441. return nil
  442. }
  443. func (ac *activeConn) Err() error {
  444. pc := ac.pc
  445. if pc == nil {
  446. return errConnClosed
  447. }
  448. return pc.c.Err()
  449. }
  450. func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
  451. pc := ac.pc
  452. if pc == nil {
  453. return nil, errConnClosed
  454. }
  455. ci := lookupCommandInfo(commandName)
  456. ac.state = (ac.state | ci.Set) &^ ci.Clear
  457. return pc.c.Do(commandName, args...)
  458. }
  459. func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
  460. pc := ac.pc
  461. if pc == nil {
  462. return nil, errConnClosed
  463. }
  464. cwt, ok := pc.c.(ConnWithTimeout)
  465. if !ok {
  466. return nil, errTimeoutNotSupported
  467. }
  468. ci := lookupCommandInfo(commandName)
  469. ac.state = (ac.state | ci.Set) &^ ci.Clear
  470. return cwt.DoWithTimeout(timeout, commandName, args...)
  471. }
  472. func (ac *activeConn) Send(commandName string, args ...interface{}) error {
  473. pc := ac.pc
  474. if pc == nil {
  475. return errConnClosed
  476. }
  477. ci := lookupCommandInfo(commandName)
  478. ac.state = (ac.state | ci.Set) &^ ci.Clear
  479. return pc.c.Send(commandName, args...)
  480. }
  481. func (ac *activeConn) Flush() error {
  482. pc := ac.pc
  483. if pc == nil {
  484. return errConnClosed
  485. }
  486. return pc.c.Flush()
  487. }
  488. func (ac *activeConn) Receive() (reply interface{}, err error) {
  489. pc := ac.pc
  490. if pc == nil {
  491. return nil, errConnClosed
  492. }
  493. return pc.c.Receive()
  494. }
  495. func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
  496. pc := ac.pc
  497. if pc == nil {
  498. return nil, errConnClosed
  499. }
  500. cwt, ok := pc.c.(ConnWithTimeout)
  501. if !ok {
  502. return nil, errTimeoutNotSupported
  503. }
  504. return cwt.ReceiveWithTimeout(timeout)
  505. }
  506. type errorConn struct{ err error }
  507. func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
  508. func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
  509. return nil, ec.err
  510. }
  511. func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
  512. func (ec errorConn) Err() error { return ec.err }
  513. func (ec errorConn) Close() error { return nil }
  514. func (ec errorConn) Flush() error { return ec.err }
  515. func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
  516. func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
  517. type idleList struct {
  518. count int
  519. front, back *poolConn
  520. }
  521. type poolConn struct {
  522. c Conn
  523. t time.Time
  524. created time.Time
  525. next, prev *poolConn
  526. }
  527. func (l *idleList) pushFront(pc *poolConn) {
  528. pc.next = l.front
  529. pc.prev = nil
  530. if l.count == 0 {
  531. l.back = pc
  532. } else {
  533. l.front.prev = pc
  534. }
  535. l.front = pc
  536. l.count++
  537. return
  538. }
  539. func (l *idleList) popFront() {
  540. pc := l.front
  541. l.count--
  542. if l.count == 0 {
  543. l.front, l.back = nil, nil
  544. } else {
  545. pc.next.prev = nil
  546. l.front = pc.next
  547. }
  548. pc.next, pc.prev = nil, nil
  549. }
  550. func (l *idleList) popBack() {
  551. pc := l.back
  552. l.count--
  553. if l.count == 0 {
  554. l.front, l.back = nil, nil
  555. } else {
  556. pc.prev.next = nil
  557. l.back = pc.prev
  558. }
  559. pc.next, pc.prev = nil, nil
  560. }