pool.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // Copyright 2012 Gary Burd
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. // License for the specific language governing permissions and limitations
  13. // under the License.
  14. package redis
  15. import (
  16. "container/list"
  17. "errors"
  18. "sync"
  19. "time"
  20. )
  21. var nowFunc = time.Now // for testing
  22. var errPoolClosed = errors.New("redigo: connection pool closed")
  23. // Pool maintains a pool of connections. The application calls the Get method
  24. // to get a connection from the pool and the connection's Close method to
  25. // return the connection's resources to the pool.
  26. //
  27. // The following example shows how to use a pool in a web application. The
  28. // application creates a pool at application startup and makes it available to
  29. // request handlers, possibly using a global variable:
  30. //
  31. // var server string // host:port of server
  32. // var password string
  33. // ...
  34. //
  35. // pool = &redis.Pool{
  36. // MaxIdle: 3,
  37. // IdleTimeout: 240 * time.Second,
  38. // Dial: func () (redis.Conn, error) {
  39. // c, err := redis.Dial("tcp", server)
  40. // if err != nil {
  41. // return nil, err
  42. // }
  43. // if err := c.Do("AUTH", password); err != nil {
  44. // c.Close()
  45. // return nil, err
  46. // }
  47. // return c, err
  48. // },
  49. // TestOnBorrow: func(c redis.Conn, t time.Time) error {
  50. // _, err := c.Do("PING")
  51. // return err
  52. // },
  53. // }
  54. //
  55. // This pool has a maximum of three connections to the server specified by the
  56. // variable "server". Each connection is authenticated using a password.
  57. //
  58. // A request handler gets a connection from the pool and closes the connection
  59. // when the handler is done:
  60. //
  61. // conn, err := pool.Get()
  62. // defer conn.Close()
  63. // // do something with the connection
  64. type Pool struct {
  65. // Dial is an application supplied function for creating new connections.
  66. Dial func() (Conn, error)
  67. // TestOnBorrow is an optional application supplied function for checking
  68. // the health of an idle connection before the connection is used again by
  69. // the application. Argument t is the time that the connection was returned
  70. // to the pool. If the function returns an error, then the connection is
  71. // closed.
  72. TestOnBorrow func(c Conn, t time.Time) error
  73. // Maximum number of idle connections in the pool.
  74. MaxIdle int
  75. // Close connections after remaining idle for this duration. If the value
  76. // is zero, then idle connections are not closed. Applications should set
  77. // the timeout to a value less than the server's timeout.
  78. IdleTimeout time.Duration
  79. // mu protects fields defined below.
  80. mu sync.Mutex
  81. closed bool
  82. // Stack of idleConn with most recently used at the front.
  83. idle list.List
  84. }
  85. type idleConn struct {
  86. c Conn
  87. t time.Time
  88. }
  89. // NewPool returns a pool that uses newPool to create connections as needed.
  90. // The pool keeps a maximum of maxIdle idle connections.
  91. func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
  92. return &Pool{Dial: newFn, MaxIdle: maxIdle}
  93. }
  94. // Get gets a connection from the pool.
  95. func (p *Pool) Get() Conn {
  96. return &pooledConnection{p: p}
  97. }
  98. // Close releases the resources used by the pool.
  99. func (p *Pool) Close() error {
  100. p.mu.Lock()
  101. idle := p.idle
  102. p.idle.Init()
  103. p.closed = true
  104. p.mu.Unlock()
  105. for e := idle.Front(); e != nil; e = e.Next() {
  106. e.Value.(idleConn).c.Close()
  107. }
  108. return nil
  109. }
  110. // get prunes stale connections and returns a connection from the idle list or
  111. // creates a new connection.
  112. func (p *Pool) get() (Conn, error) {
  113. p.mu.Lock()
  114. if p.closed {
  115. p.mu.Unlock()
  116. return nil, errors.New("redigo: get on closed pool")
  117. }
  118. // Prune stale connections.
  119. if timeout := p.IdleTimeout; timeout > 0 {
  120. for i, n := 0, p.idle.Len(); i < n; i++ {
  121. e := p.idle.Back()
  122. if e == nil {
  123. break
  124. }
  125. ic := e.Value.(idleConn)
  126. if ic.t.Add(timeout).After(nowFunc()) {
  127. break
  128. }
  129. p.idle.Remove(e)
  130. p.mu.Unlock()
  131. ic.c.Close()
  132. p.mu.Lock()
  133. }
  134. }
  135. // Get idle connection.
  136. for i, n := 0, p.idle.Len(); i < n; i++ {
  137. e := p.idle.Front()
  138. if e == nil {
  139. break
  140. }
  141. ic := e.Value.(idleConn)
  142. p.idle.Remove(e)
  143. test := p.TestOnBorrow
  144. p.mu.Unlock()
  145. if test != nil && test(ic.c, ic.t) != nil {
  146. ic.c.Close()
  147. } else {
  148. return ic.c, nil
  149. }
  150. p.mu.Lock()
  151. }
  152. // No idle connection, create new.
  153. dial := p.Dial
  154. p.mu.Unlock()
  155. return dial()
  156. }
  157. func (p *Pool) put(c Conn) error {
  158. p.mu.Lock()
  159. if !p.closed {
  160. p.idle.PushFront(idleConn{t: nowFunc(), c: c})
  161. if p.idle.Len() > p.MaxIdle {
  162. c = p.idle.Remove(p.idle.Back()).(idleConn).c
  163. } else {
  164. c = nil
  165. }
  166. }
  167. p.mu.Unlock()
  168. if c != nil {
  169. return c.Close()
  170. }
  171. return nil
  172. }
  173. type pooledConnection struct {
  174. c Conn
  175. err error
  176. p *Pool
  177. }
  178. func (c *pooledConnection) get() error {
  179. if c.err == nil && c.c == nil {
  180. c.c, c.err = c.p.get()
  181. }
  182. return c.err
  183. }
  184. func (c *pooledConnection) Close() (err error) {
  185. if c.c != nil {
  186. c.c.Do("")
  187. if c.c.Err() != nil {
  188. err = c.c.Close()
  189. } else {
  190. err = c.p.put(c.c)
  191. }
  192. c.c = nil
  193. c.err = errPoolClosed
  194. }
  195. return err
  196. }
  197. func (c *pooledConnection) Err() error {
  198. if err := c.get(); err != nil {
  199. return err
  200. }
  201. return c.c.Err()
  202. }
  203. func (c *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
  204. if err := c.get(); err != nil {
  205. return nil, err
  206. }
  207. return c.c.Do(commandName, args...)
  208. }
  209. func (c *pooledConnection) Send(commandName string, args ...interface{}) error {
  210. if err := c.get(); err != nil {
  211. return err
  212. }
  213. return c.c.Send(commandName, args...)
  214. }
  215. func (c *pooledConnection) Flush() error {
  216. if err := c.get(); err != nil {
  217. return err
  218. }
  219. return c.c.Flush()
  220. }
  221. func (c *pooledConnection) Receive() (reply interface{}, err error) {
  222. if err := c.get(); err != nil {
  223. return nil, err
  224. }
  225. return c.c.Receive()
  226. }