connectionpool.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  11. Purpose
  12. The connection pool in gocql opens and closes connections as well as selects an available connection
  13. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  14. are caught by the connection experiencing the error.
  15. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  16. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  17. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  18. Example of Single Connection Pool:
  19. type SingleConnection struct {
  20. conn *Conn
  21. cfg *ClusterConfig
  22. }
  23. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  24. addr := strings.TrimSpace(cfg.Hosts[0])
  25. if strings.Index(addr, ":") < 0 {
  26. addr = fmt.Sprintf("%s:%d", addr, cfg.Port)
  27. }
  28. connCfg := ConnConfig{
  29. ProtoVersion: cfg.ProtoVersion,
  30. CQLVersion: cfg.CQLVersion,
  31. Timeout: cfg.Timeout,
  32. NumStreams: cfg.NumStreams,
  33. Compressor: cfg.Compressor,
  34. Authenticator: cfg.Authenticator,
  35. Keepalive: cfg.SocketKeepalive,
  36. }
  37. pool := SingleConnection{cfg:cfg}
  38. pool.conn = Connect(addr,connCfg,pool)
  39. return &pool
  40. }
  41. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  42. if closed {
  43. connCfg := ConnConfig{
  44. ProtoVersion: cfg.ProtoVersion,
  45. CQLVersion: cfg.CQLVersion,
  46. Timeout: cfg.Timeout,
  47. NumStreams: cfg.NumStreams,
  48. Compressor: cfg.Compressor,
  49. Authenticator: cfg.Authenticator,
  50. Keepalive: cfg.SocketKeepalive,
  51. }
  52. s.conn = Connect(conn.Address(),connCfg,s)
  53. }
  54. }
  55. func (s *SingleConnection) Pick(qry *Query) *Conn {
  56. if s.conn.isClosed {
  57. return nil
  58. }
  59. return s.conn
  60. }
  61. func (s *SingleConnection) Size() int {
  62. return 1
  63. }
  64. func (s *SingleConnection) Close() {
  65. s.conn.Close()
  66. }
  67. This is a very simple example of a type that exposes the connection pool interface. To assign
  68. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  69. cluster := NewCluster("127.0.0.1")
  70. cluster.ConnPoolType = NewSingleConnection
  71. ...
  72. session, err := cluster.CreateSession()
  73. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  74. */
  75. type ConnectionPool interface {
  76. Pick(*Query) *Conn
  77. Size() int
  78. HandleError(*Conn, error, bool)
  79. Close()
  80. SetHosts(host []HostInfo)
  81. }
  82. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  83. type NewPoolFunc func(*ClusterConfig) ConnectionPool
  84. //SimplePool is the current implementation of the connection pool inside gocql. This
  85. //pool is meant to be a simple default used by gocql so users can get up and running
  86. //quickly.
  87. type SimplePool struct {
  88. cfg *ClusterConfig
  89. hostPool *RoundRobin
  90. connPool map[string]*RoundRobin
  91. conns map[*Conn]struct{}
  92. keyspace string
  93. hostMu sync.RWMutex
  94. // this is the set of current hosts which the pool will attempt to connect to
  95. hosts map[string]*HostInfo
  96. // protects hostpool, connPoll, conns, quit
  97. mu sync.Mutex
  98. cFillingPool chan int
  99. quit bool
  100. quitWait chan bool
  101. quitOnce sync.Once
  102. }
  103. //NewSimplePool is the function used by gocql to create the simple connection pool.
  104. //This is the default if no other pool type is specified.
  105. func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
  106. pool := &SimplePool{
  107. cfg: cfg,
  108. hostPool: NewRoundRobin(),
  109. connPool: make(map[string]*RoundRobin),
  110. conns: make(map[*Conn]struct{}),
  111. quitWait: make(chan bool),
  112. cFillingPool: make(chan int, 1),
  113. keyspace: cfg.Keyspace,
  114. hosts: make(map[string]*HostInfo),
  115. }
  116. for _, host := range cfg.Hosts {
  117. // seed hosts have unknown topology
  118. // TODO: Handle populating this during SetHosts
  119. pool.hosts[host] = &HostInfo{Peer: host}
  120. }
  121. //Walk through connecting to hosts. As soon as one host connects
  122. //defer the remaining connections to cluster.fillPool()
  123. for i := 0; i < len(cfg.Hosts); i++ {
  124. addr := strings.TrimSpace(cfg.Hosts[i])
  125. if _, _, err := net.SplitHostPort(addr); err != nil {
  126. addr = net.JoinHostPort(addr, strconv.Itoa(cfg.Port))
  127. }
  128. if pool.connect(addr) == nil {
  129. pool.cFillingPool <- 1
  130. go pool.fillPool()
  131. break
  132. }
  133. }
  134. return pool
  135. }
  136. func (c *SimplePool) connect(addr string) error {
  137. cfg := ConnConfig{
  138. ProtoVersion: c.cfg.ProtoVersion,
  139. CQLVersion: c.cfg.CQLVersion,
  140. Timeout: c.cfg.Timeout,
  141. NumStreams: c.cfg.NumStreams,
  142. Compressor: c.cfg.Compressor,
  143. Authenticator: c.cfg.Authenticator,
  144. Keepalive: c.cfg.SocketKeepalive,
  145. SslOpts: c.cfg.SslOpts,
  146. }
  147. conn, err := Connect(addr, cfg, c)
  148. if err != nil {
  149. log.Printf("connect: failed to connect to %q: %v", addr, err)
  150. return err
  151. }
  152. return c.addConn(conn)
  153. }
  154. func (c *SimplePool) addConn(conn *Conn) error {
  155. c.mu.Lock()
  156. defer c.mu.Unlock()
  157. if c.quit {
  158. conn.Close()
  159. return nil
  160. }
  161. //Set the connection's keyspace if any before adding it to the pool
  162. if c.keyspace != "" {
  163. if err := conn.UseKeyspace(c.keyspace); err != nil {
  164. log.Printf("error setting connection keyspace. %v", err)
  165. conn.Close()
  166. return err
  167. }
  168. }
  169. connPool := c.connPool[conn.Address()]
  170. if connPool == nil {
  171. connPool = NewRoundRobin()
  172. c.connPool[conn.Address()] = connPool
  173. c.hostPool.AddNode(connPool)
  174. }
  175. connPool.AddNode(conn)
  176. c.conns[conn] = struct{}{}
  177. return nil
  178. }
  179. //fillPool manages the pool of connections making sure that each host has the correct
  180. //amount of connections defined. Also the method will test a host with one connection
  181. //instead of flooding the host with number of connections defined in the cluster config
  182. func (c *SimplePool) fillPool() {
  183. //Debounce large amounts of requests to fill pool
  184. select {
  185. case <-time.After(1 * time.Millisecond):
  186. return
  187. case <-c.cFillingPool:
  188. defer func() { c.cFillingPool <- 1 }()
  189. }
  190. c.mu.Lock()
  191. isClosed := c.quit
  192. c.mu.Unlock()
  193. //Exit if cluster(session) is closed
  194. if isClosed {
  195. return
  196. }
  197. c.hostMu.RLock()
  198. //Walk through list of defined hosts
  199. for host := range c.hosts {
  200. addr := strings.TrimSpace(host)
  201. if _, _, err := net.SplitHostPort(addr); err != nil {
  202. addr = net.JoinHostPort(addr, strconv.Itoa(c.cfg.Port))
  203. }
  204. numConns := 1
  205. //See if the host already has connections in the pool
  206. c.mu.Lock()
  207. conns, ok := c.connPool[addr]
  208. c.mu.Unlock()
  209. if ok {
  210. //if the host has enough connections just exit
  211. numConns = conns.Size()
  212. if numConns >= c.cfg.NumConns {
  213. continue
  214. }
  215. } else {
  216. //See if the host is reachable
  217. if err := c.connect(addr); err != nil {
  218. continue
  219. }
  220. }
  221. //This is reached if the host is responsive and needs more connections
  222. //Create connections for host synchronously to mitigate flooding the host.
  223. go func(a string, conns int) {
  224. for ; conns < c.cfg.NumConns; conns++ {
  225. c.connect(a)
  226. }
  227. }(addr, numConns)
  228. }
  229. c.hostMu.RUnlock()
  230. }
  231. // Should only be called if c.mu is locked
  232. func (c *SimplePool) removeConnLocked(conn *Conn) {
  233. conn.Close()
  234. connPool := c.connPool[conn.addr]
  235. if connPool == nil {
  236. return
  237. }
  238. connPool.RemoveNode(conn)
  239. if connPool.Size() == 0 {
  240. c.hostPool.RemoveNode(connPool)
  241. delete(c.connPool, conn.addr)
  242. }
  243. delete(c.conns, conn)
  244. }
  245. func (c *SimplePool) removeConn(conn *Conn) {
  246. c.mu.Lock()
  247. defer c.mu.Unlock()
  248. c.removeConnLocked(conn)
  249. }
  250. //HandleError is called by a Connection object to report to the pool an error has occured.
  251. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  252. //top off the pool.
  253. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  254. if !closed {
  255. // ignore all non-fatal errors
  256. return
  257. }
  258. c.removeConn(conn)
  259. if !c.quit {
  260. go c.fillPool() // top off pool.
  261. }
  262. }
  263. //Pick selects a connection to be used by the query.
  264. func (c *SimplePool) Pick(qry *Query) *Conn {
  265. //Check if connections are available
  266. c.mu.Lock()
  267. conns := len(c.conns)
  268. c.mu.Unlock()
  269. if conns == 0 {
  270. //try to populate the pool before returning.
  271. c.fillPool()
  272. }
  273. return c.hostPool.Pick(qry)
  274. }
  275. //Size returns the number of connections currently active in the pool
  276. func (p *SimplePool) Size() int {
  277. p.mu.Lock()
  278. conns := len(p.conns)
  279. p.mu.Unlock()
  280. return conns
  281. }
  282. //Close kills the pool and all associated connections.
  283. func (c *SimplePool) Close() {
  284. c.quitOnce.Do(func() {
  285. c.mu.Lock()
  286. defer c.mu.Unlock()
  287. c.quit = true
  288. close(c.quitWait)
  289. for conn := range c.conns {
  290. c.removeConnLocked(conn)
  291. }
  292. })
  293. }
  294. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  295. c.hostMu.Lock()
  296. toRemove := make(map[string]struct{})
  297. for k := range c.hosts {
  298. toRemove[k] = struct{}{}
  299. }
  300. for _, host := range hosts {
  301. host := host
  302. delete(toRemove, host.Peer)
  303. // we already have it
  304. if _, ok := c.hosts[host.Peer]; ok {
  305. // TODO: Check rack, dc, token range is consistent, trigger topology change
  306. // update stored host
  307. continue
  308. }
  309. c.hosts[host.Peer] = &host
  310. }
  311. // can we hold c.mu whilst iterating this loop?
  312. for addr := range toRemove {
  313. c.removeHostLocked(addr)
  314. }
  315. c.hostMu.Unlock()
  316. c.fillPool()
  317. }
  318. func (c *SimplePool) removeHostLocked(addr string) {
  319. if _, ok := c.hosts[addr]; !ok {
  320. return
  321. }
  322. delete(c.hosts, addr)
  323. c.mu.Lock()
  324. defer c.mu.Unlock()
  325. if _, ok := c.connPool[addr]; !ok {
  326. return
  327. }
  328. for conn := range c.conns {
  329. if conn.Address() == addr {
  330. c.removeConnLocked(conn)
  331. }
  332. }
  333. }