connectionpool.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  10. Purpose
  11. The connection pool in gocql opens and closes connections as well as selects an available connection
  12. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  13. are caught by the connection experiencing the error.
  14. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  15. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  16. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  17. Example of Single Connection Pool:
  18. type SingleConnection struct {
  19. conn *Conn
  20. cfg *ClusterConfig
  21. }
  22. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  23. addr := strings.TrimSpace(cfg.Hosts[0])
  24. if strings.Index(addr, ":") < 0 {
  25. addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
  26. }
  27. connCfg := ConnConfig{
  28. ProtoVersion: cfg.ProtoVersion,
  29. CQLVersion: cfg.CQLVersion,
  30. Timeout: cfg.Timeout,
  31. NumStreams: cfg.NumStreams,
  32. Compressor: cfg.Compressor,
  33. Authenticator: cfg.Authenticator,
  34. Keepalive: cfg.SocketKeepalive,
  35. }
  36. pool := SingleConnection{cfg:cfg}
  37. pool.conn = Connect(addr,connCfg,pool)
  38. return &pool
  39. }
  40. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  41. if closed {
  42. connCfg := ConnConfig{
  43. ProtoVersion: cfg.ProtoVersion,
  44. CQLVersion: cfg.CQLVersion,
  45. Timeout: cfg.Timeout,
  46. NumStreams: cfg.NumStreams,
  47. Compressor: cfg.Compressor,
  48. Authenticator: cfg.Authenticator,
  49. Keepalive: cfg.SocketKeepalive,
  50. }
  51. s.conn = Connect(conn.Address(),connCfg,s)
  52. }
  53. }
  54. func (s *SingleConnection) Pick(qry *Query) *Conn {
  55. if s.conn.isClosed {
  56. return nil
  57. }
  58. return s.conn
  59. }
  60. func (s *SingleConnection) Size() int {
  61. return 1
  62. }
  63. func (s *SingleConnection) Close() {
  64. s.conn.Close()
  65. }
  66. This is a very simple example of a type that exposes the connection pool interface. To assign
  67. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  68. cluster := NewCluster("127.0.0.1")
  69. cluster.ConnPoolType = NewSingleConnection
  70. ...
  71. session, err := cluster.CreateSession()
  72. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  73. */
  74. type ConnectionPool interface {
  75. Pick(*Query) *Conn
  76. Size() int
  77. HandleError(*Conn, error, bool)
  78. Close()
  79. }
  80. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  81. type NewPoolFunc func(*ClusterConfig) ConnectionPool
  82. //SimplePool is the current implementation of the connection pool inside gocql. This
  83. //pool is meant to be a simple default used by gocql so users can get up and running
  84. //quickly.
  85. type SimplePool struct {
  86. cfg *ClusterConfig
  87. hostPool *RoundRobin
  88. connPool map[string]*RoundRobin
  89. conns map[*Conn]struct{}
  90. keyspace string
  91. mu sync.Mutex
  92. cFillingPool chan int
  93. quit bool
  94. quitWait chan bool
  95. quitOnce sync.Once
  96. }
  97. //NewSimplePool is the function used by gocql to create the simple connection pool.
  98. //This is the default if no other pool type is specified.
  99. func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
  100. pool := SimplePool{
  101. cfg: cfg,
  102. hostPool: NewRoundRobin(),
  103. connPool: make(map[string]*RoundRobin),
  104. conns: make(map[*Conn]struct{}),
  105. quitWait: make(chan bool),
  106. cFillingPool: make(chan int, 1),
  107. keyspace: cfg.Keyspace,
  108. }
  109. //Walk through connecting to hosts. As soon as one host connects
  110. //defer the remaining connections to cluster.fillPool()
  111. for i := 0; i < len(cfg.Hosts); i++ {
  112. addr := strings.TrimSpace(cfg.Hosts[i])
  113. if strings.Index(addr, ":") < 0 {
  114. addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
  115. }
  116. if pool.connect(addr) == nil {
  117. pool.cFillingPool <- 1
  118. go pool.fillPool()
  119. break
  120. }
  121. }
  122. return &pool
  123. }
  124. func (c *SimplePool) connect(addr string) error {
  125. cfg := ConnConfig{
  126. ProtoVersion: c.cfg.ProtoVersion,
  127. CQLVersion: c.cfg.CQLVersion,
  128. Timeout: c.cfg.Timeout,
  129. NumStreams: c.cfg.NumStreams,
  130. Compressor: c.cfg.Compressor,
  131. Authenticator: c.cfg.Authenticator,
  132. Keepalive: c.cfg.SocketKeepalive,
  133. }
  134. for {
  135. conn, err := Connect(addr, cfg, c)
  136. if err != nil {
  137. log.Printf("failed to connect to %q: %v", addr, err)
  138. return err
  139. }
  140. return c.addConn(conn)
  141. }
  142. }
  143. func (c *SimplePool) addConn(conn *Conn) error {
  144. c.mu.Lock()
  145. defer c.mu.Unlock()
  146. if c.quit {
  147. conn.Close()
  148. return nil
  149. }
  150. //Set the connection's keyspace if any before adding it to the pool
  151. if c.keyspace != "" {
  152. if err := conn.UseKeyspace(c.keyspace); err != nil {
  153. log.Printf("error setting connection keyspace. %v", err)
  154. conn.Close()
  155. return err
  156. }
  157. }
  158. connPool := c.connPool[conn.Address()]
  159. if connPool == nil {
  160. connPool = NewRoundRobin()
  161. c.connPool[conn.Address()] = connPool
  162. c.hostPool.AddNode(connPool)
  163. }
  164. connPool.AddNode(conn)
  165. c.conns[conn] = struct{}{}
  166. return nil
  167. }
  168. //fillPool manages the pool of connections making sure that each host has the correct
  169. //amount of connections defined. Also the method will test a host with one connection
  170. //instead of flooding the host with number of connections defined in the cluster config
  171. func (c *SimplePool) fillPool() {
  172. //Debounce large amounts of requests to fill pool
  173. select {
  174. case <-time.After(1 * time.Millisecond):
  175. return
  176. case <-c.cFillingPool:
  177. defer func() { c.cFillingPool <- 1 }()
  178. }
  179. c.mu.Lock()
  180. isClosed := c.quit
  181. c.mu.Unlock()
  182. //Exit if cluster(session) is closed
  183. if isClosed {
  184. return
  185. }
  186. //Walk through list of defined hosts
  187. for i := 0; i < len(c.cfg.Hosts); i++ {
  188. addr := strings.TrimSpace(c.cfg.Hosts[i])
  189. if strings.Index(addr, ":") < 0 {
  190. addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
  191. }
  192. var numConns int = 1
  193. //See if the host already has connections in the pool
  194. c.mu.Lock()
  195. conns, ok := c.connPool[addr]
  196. c.mu.Unlock()
  197. if ok {
  198. //if the host has enough connections just exit
  199. numConns = conns.Size()
  200. if numConns >= c.cfg.NumConns {
  201. continue
  202. }
  203. } else {
  204. //See if the host is reachable
  205. if err := c.connect(addr); err != nil {
  206. continue
  207. }
  208. }
  209. //This is reached if the host is responsive and needs more connections
  210. //Create connections for host synchronously to mitigate flooding the host.
  211. go func(a string, conns int) {
  212. for ; conns < c.cfg.NumConns; conns++ {
  213. c.connect(addr)
  214. }
  215. }(addr, numConns)
  216. }
  217. }
  218. // Should only be called if c.mu is locked
  219. func (c *SimplePool) removeConnLocked(conn *Conn) {
  220. conn.Close()
  221. connPool := c.connPool[conn.addr]
  222. if connPool == nil {
  223. return
  224. }
  225. connPool.RemoveNode(conn)
  226. if connPool.Size() == 0 {
  227. c.hostPool.RemoveNode(connPool)
  228. delete(c.connPool, conn.addr)
  229. }
  230. delete(c.conns, conn)
  231. }
  232. func (c *SimplePool) removeConn(conn *Conn) {
  233. c.mu.Lock()
  234. defer c.mu.Unlock()
  235. c.removeConnLocked(conn)
  236. }
  237. //HandleError is called by a Connection object to report to the pool an error has occured.
  238. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  239. //top off the pool.
  240. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  241. if !closed {
  242. // ignore all non-fatal errors
  243. return
  244. }
  245. c.removeConn(conn)
  246. if !c.quit {
  247. go c.fillPool() // top off pool.
  248. }
  249. }
  250. //Pick selects a connection to be used by the query.
  251. func (c *SimplePool) Pick(qry *Query) *Conn {
  252. //Check if connections are available
  253. c.mu.Lock()
  254. conns := len(c.conns)
  255. c.mu.Unlock()
  256. if conns == 0 {
  257. //try to populate the pool before returning.
  258. c.fillPool()
  259. }
  260. return c.hostPool.Pick(qry)
  261. }
  262. //Size returns the number of connections currently active in the pool
  263. func (p *SimplePool) Size() int {
  264. p.mu.Lock()
  265. conns := len(p.conns)
  266. p.mu.Unlock()
  267. return conns
  268. }
  269. //Close kills the pool and all associated connections.
  270. func (c *SimplePool) Close() {
  271. c.quitOnce.Do(func() {
  272. c.mu.Lock()
  273. defer c.mu.Unlock()
  274. c.quit = true
  275. close(c.quitWait)
  276. for conn := range c.conns {
  277. c.removeConnLocked(conn)
  278. }
  279. })
  280. }