connectionpool.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. //ConnectionPool is the interface gocql expects to be exposed for a connection pool.
  10. type ConnectionPool interface {
  11. Pick(*Query) *Conn
  12. Size() int
  13. HandleError(*Conn, error, bool)
  14. Close()
  15. }
  16. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  17. type NewPoolFunc func(*ClusterConfig) ConnectionPool
  18. //SimplePool is the current implementation of the connection pool inside gocql. This
  19. //pool is meant to be a simple default used by gocql so users can get up and running
  20. //quickly.
  21. type SimplePool struct {
  22. cfg *ClusterConfig
  23. hostPool *RoundRobin
  24. connPool map[string]*RoundRobin
  25. conns map[*Conn]struct{}
  26. keyspace string
  27. mu sync.Mutex
  28. cFillingPool chan int
  29. quit bool
  30. quitWait chan bool
  31. quitOnce sync.Once
  32. }
  33. //NewSimplePool is the function used by gocql to create the simple connection pool.
  34. //This is the default if no other pool type is specified.
  35. func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
  36. pool := SimplePool{
  37. cfg: cfg,
  38. hostPool: NewRoundRobin(),
  39. connPool: make(map[string]*RoundRobin),
  40. conns: make(map[*Conn]struct{}),
  41. quitWait: make(chan bool),
  42. cFillingPool: make(chan int, 1),
  43. keyspace: cfg.Keyspace,
  44. }
  45. //Walk through connecting to hosts. As soon as one host connects
  46. //defer the remaining connections to cluster.fillPool()
  47. for i := 0; i < len(pool.cfg.Hosts); i++ {
  48. addr := strings.TrimSpace(pool.cfg.Hosts[i])
  49. if strings.Index(addr, ":") < 0 {
  50. addr = fmt.Sprintf("%s:%d", addr, pool.cfg.DefaultPort)
  51. }
  52. if pool.connect(addr) == nil {
  53. pool.cFillingPool <- 1
  54. go pool.fillPool()
  55. break
  56. }
  57. }
  58. return &pool
  59. }
  60. func (c *SimplePool) connect(addr string) error {
  61. cfg := ConnConfig{
  62. ProtoVersion: c.cfg.ProtoVersion,
  63. CQLVersion: c.cfg.CQLVersion,
  64. Timeout: c.cfg.Timeout,
  65. NumStreams: c.cfg.NumStreams,
  66. Compressor: c.cfg.Compressor,
  67. Authenticator: c.cfg.Authenticator,
  68. Keepalive: c.cfg.SocketKeepalive,
  69. }
  70. for {
  71. conn, err := Connect(addr, cfg, c)
  72. if err != nil {
  73. log.Printf("failed to connect to %q: %v", addr, err)
  74. return err
  75. }
  76. return c.addConn(conn)
  77. }
  78. }
  79. func (c *SimplePool) addConn(conn *Conn) error {
  80. c.mu.Lock()
  81. defer c.mu.Unlock()
  82. if c.quit {
  83. conn.Close()
  84. return nil
  85. }
  86. //Set the connection's keyspace if any before adding it to the pool
  87. if c.keyspace != "" {
  88. if err := conn.UseKeyspace(c.keyspace); err != nil {
  89. log.Printf("error setting connection keyspace. %v", err)
  90. conn.Close()
  91. return err
  92. }
  93. }
  94. connPool := c.connPool[conn.Address()]
  95. if connPool == nil {
  96. connPool = NewRoundRobin()
  97. c.connPool[conn.Address()] = connPool
  98. c.hostPool.AddNode(connPool)
  99. }
  100. connPool.AddNode(conn)
  101. c.conns[conn] = struct{}{}
  102. return nil
  103. }
  104. //fillPool manages the pool of connections making sure that each host has the correct
  105. //amount of connections defined. Also the method will test a host with one connection
  106. //instead of flooding the host with number of connections defined in the cluster config
  107. func (c *SimplePool) fillPool() {
  108. //Debounce large amounts of requests to fill pool
  109. select {
  110. case <-time.After(1 * time.Millisecond):
  111. return
  112. case <-c.cFillingPool:
  113. defer func() { c.cFillingPool <- 1 }()
  114. }
  115. c.mu.Lock()
  116. isClosed := c.quit
  117. c.mu.Unlock()
  118. //Exit if cluster(session) is closed
  119. if isClosed {
  120. return
  121. }
  122. //Walk through list of defined hosts
  123. for i := 0; i < len(c.cfg.Hosts); i++ {
  124. addr := strings.TrimSpace(c.cfg.Hosts[i])
  125. if strings.Index(addr, ":") < 0 {
  126. addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
  127. }
  128. var numConns int = 1
  129. //See if the host already has connections in the pool
  130. c.mu.Lock()
  131. conns, ok := c.connPool[addr]
  132. c.mu.Unlock()
  133. if ok {
  134. //if the host has enough connections just exit
  135. numConns = conns.Size()
  136. if numConns >= c.cfg.NumConns {
  137. continue
  138. }
  139. } else {
  140. //See if the host is reachable
  141. if err := c.connect(addr); err != nil {
  142. continue
  143. }
  144. }
  145. //This is reached if the host is responsive and needs more connections
  146. //Create connections for host synchronously to mitigate flooding the host.
  147. go func(a string, conns int) {
  148. for ; conns < c.cfg.NumConns; conns++ {
  149. c.connect(addr)
  150. }
  151. }(addr, numConns)
  152. }
  153. }
  154. // Should only be called if c.mu is locked
  155. func (c *SimplePool) removeConnLocked(conn *Conn) {
  156. conn.Close()
  157. connPool := c.connPool[conn.addr]
  158. if connPool == nil {
  159. return
  160. }
  161. connPool.RemoveNode(conn)
  162. if connPool.Size() == 0 {
  163. c.hostPool.RemoveNode(connPool)
  164. delete(c.connPool, conn.addr)
  165. }
  166. delete(c.conns, conn)
  167. }
  168. func (c *SimplePool) removeConn(conn *Conn) {
  169. c.mu.Lock()
  170. defer c.mu.Unlock()
  171. c.removeConnLocked(conn)
  172. }
  173. //HandleError is called by a Connection object to report to the pool an error has occured.
  174. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  175. //top off the pool.
  176. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  177. if !closed {
  178. // ignore all non-fatal errors
  179. return
  180. }
  181. c.removeConn(conn)
  182. if !c.quit {
  183. go c.fillPool() // top off pool.
  184. }
  185. }
  186. //Pick selects a connection to be used by the query.
  187. func (c *SimplePool) Pick(qry *Query) *Conn {
  188. //Check if connections are available
  189. c.mu.Lock()
  190. conns := len(c.conns)
  191. c.mu.Unlock()
  192. if conns == 0 {
  193. //try to populate the pool before returning.
  194. c.fillPool()
  195. }
  196. return c.hostPool.Pick(qry)
  197. }
  198. //Size returns the number of connections currently active in the pool
  199. func (p *SimplePool) Size() int {
  200. p.mu.Lock()
  201. conns := len(p.conns)
  202. p.mu.Unlock()
  203. return conns
  204. }
  205. //Close kills the pool and all associated connections.
  206. func (c *SimplePool) Close() {
  207. c.quitOnce.Do(func() {
  208. c.mu.Lock()
  209. defer c.mu.Unlock()
  210. c.quit = true
  211. close(c.quitWait)
  212. for conn := range c.conns {
  213. c.removeConnLocked(conn)
  214. }
  215. })
  216. }