connectionpool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package gocql
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "sync"
  10. "time"
  11. )
  12. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  13. Purpose
  14. The connection pool in gocql opens and closes connections as well as selects an available connection
  15. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  16. are caught by the connection experiencing the error.
  17. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  18. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  19. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  20. Example of Single Connection Pool:
  21. type SingleConnection struct {
  22. conn *Conn
  23. cfg *ClusterConfig
  24. }
  25. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  26. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  27. connCfg := ConnConfig{
  28. ProtoVersion: cfg.ProtoVersion,
  29. CQLVersion: cfg.CQLVersion,
  30. Timeout: cfg.Timeout,
  31. NumStreams: cfg.NumStreams,
  32. Compressor: cfg.Compressor,
  33. Authenticator: cfg.Authenticator,
  34. Keepalive: cfg.SocketKeepalive,
  35. }
  36. pool := SingleConnection{cfg:cfg}
  37. pool.conn = Connect(addr,connCfg,pool)
  38. return &pool
  39. }
  40. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  41. if closed {
  42. connCfg := ConnConfig{
  43. ProtoVersion: cfg.ProtoVersion,
  44. CQLVersion: cfg.CQLVersion,
  45. Timeout: cfg.Timeout,
  46. NumStreams: cfg.NumStreams,
  47. Compressor: cfg.Compressor,
  48. Authenticator: cfg.Authenticator,
  49. Keepalive: cfg.SocketKeepalive,
  50. }
  51. s.conn = Connect(conn.Address(),connCfg,s)
  52. }
  53. }
  54. func (s *SingleConnection) Pick(qry *Query) *Conn {
  55. if s.conn.isClosed {
  56. return nil
  57. }
  58. return s.conn
  59. }
  60. func (s *SingleConnection) Size() int {
  61. return 1
  62. }
  63. func (s *SingleConnection) Close() {
  64. s.conn.Close()
  65. }
  66. This is a very simple example of a type that exposes the connection pool interface. To assign
  67. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  68. cluster := NewCluster("127.0.0.1")
  69. cluster.ConnPoolType = NewSingleConnection
  70. ...
  71. session, err := cluster.CreateSession()
  72. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  73. */
  74. type ConnectionPool interface {
  75. Pick(*Query) *Conn
  76. Size() int
  77. HandleError(*Conn, error, bool)
  78. Close()
  79. SetHosts(host []HostInfo)
  80. }
  81. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  82. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  83. //SimplePool is the current implementation of the connection pool inside gocql. This
  84. //pool is meant to be a simple default used by gocql so users can get up and running
  85. //quickly.
  86. type SimplePool struct {
  87. cfg *ClusterConfig
  88. hostPool *RoundRobin
  89. connPool map[string]*RoundRobin
  90. conns map[*Conn]struct{}
  91. keyspace string
  92. hostMu sync.RWMutex
  93. // this is the set of current hosts which the pool will attempt to connect to
  94. hosts map[string]*HostInfo
  95. // protects hostpool, connPoll, conns, quit
  96. mu sync.Mutex
  97. cFillingPool chan int
  98. quit bool
  99. quitWait chan bool
  100. quitOnce sync.Once
  101. tlsConfig *tls.Config
  102. }
  103. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  104. certPool := x509.NewCertPool()
  105. // ca cert is optional
  106. if sslOpts.CaPath != "" {
  107. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  108. if err != nil {
  109. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  110. }
  111. if !certPool.AppendCertsFromPEM(pem) {
  112. return nil, errors.New("connectionpool: failed parsing or CA certs")
  113. }
  114. }
  115. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  116. if err != nil {
  117. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  118. }
  119. config := &tls.Config{
  120. Certificates: []tls.Certificate{mycert},
  121. RootCAs: certPool,
  122. }
  123. config.InsecureSkipVerify = !sslOpts.EnableHostVerification
  124. return config, nil
  125. }
  126. //NewSimplePool is the function used by gocql to create the simple connection pool.
  127. //This is the default if no other pool type is specified.
  128. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  129. pool := &SimplePool{
  130. cfg: cfg,
  131. hostPool: NewRoundRobin(),
  132. connPool: make(map[string]*RoundRobin),
  133. conns: make(map[*Conn]struct{}),
  134. quitWait: make(chan bool),
  135. cFillingPool: make(chan int, 1),
  136. keyspace: cfg.Keyspace,
  137. hosts: make(map[string]*HostInfo),
  138. }
  139. for _, host := range cfg.Hosts {
  140. // seed hosts have unknown topology
  141. // TODO: Handle populating this during SetHosts
  142. pool.hosts[host] = &HostInfo{Peer: host}
  143. }
  144. if cfg.SslOpts != nil {
  145. config, err := setupTLSConfig(cfg.SslOpts)
  146. if err != nil {
  147. return nil, err
  148. }
  149. pool.tlsConfig = config
  150. }
  151. //Walk through connecting to hosts. As soon as one host connects
  152. //defer the remaining connections to cluster.fillPool()
  153. for i := 0; i < len(cfg.Hosts); i++ {
  154. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  155. if pool.connect(addr) == nil {
  156. pool.cFillingPool <- 1
  157. go pool.fillPool()
  158. break
  159. }
  160. }
  161. return pool, nil
  162. }
  163. func (c *SimplePool) connect(addr string) error {
  164. cfg := ConnConfig{
  165. ProtoVersion: c.cfg.ProtoVersion,
  166. CQLVersion: c.cfg.CQLVersion,
  167. Timeout: c.cfg.Timeout,
  168. NumStreams: c.cfg.NumStreams,
  169. Compressor: c.cfg.Compressor,
  170. Authenticator: c.cfg.Authenticator,
  171. Keepalive: c.cfg.SocketKeepalive,
  172. TLSConfig: c.tlsConfig,
  173. }
  174. conn, err := Connect(addr, cfg, c)
  175. if err != nil {
  176. log.Printf("connect: failed to connect to %q: %v", addr, err)
  177. return err
  178. }
  179. return c.addConn(conn)
  180. }
  181. func (c *SimplePool) addConn(conn *Conn) error {
  182. c.mu.Lock()
  183. defer c.mu.Unlock()
  184. if c.quit {
  185. conn.Close()
  186. return nil
  187. }
  188. //Set the connection's keyspace if any before adding it to the pool
  189. if c.keyspace != "" {
  190. if err := conn.UseKeyspace(c.keyspace); err != nil {
  191. log.Printf("error setting connection keyspace. %v", err)
  192. conn.Close()
  193. return err
  194. }
  195. }
  196. connPool := c.connPool[conn.Address()]
  197. if connPool == nil {
  198. connPool = NewRoundRobin()
  199. c.connPool[conn.Address()] = connPool
  200. c.hostPool.AddNode(connPool)
  201. }
  202. connPool.AddNode(conn)
  203. c.conns[conn] = struct{}{}
  204. return nil
  205. }
  206. //fillPool manages the pool of connections making sure that each host has the correct
  207. //amount of connections defined. Also the method will test a host with one connection
  208. //instead of flooding the host with number of connections defined in the cluster config
  209. func (c *SimplePool) fillPool() {
  210. //Debounce large amounts of requests to fill pool
  211. select {
  212. case <-time.After(1 * time.Millisecond):
  213. return
  214. case <-c.cFillingPool:
  215. defer func() { c.cFillingPool <- 1 }()
  216. }
  217. c.mu.Lock()
  218. isClosed := c.quit
  219. c.mu.Unlock()
  220. //Exit if cluster(session) is closed
  221. if isClosed {
  222. return
  223. }
  224. c.hostMu.RLock()
  225. //Walk through list of defined hosts
  226. var wg sync.WaitGroup
  227. for host := range c.hosts {
  228. addr := JoinHostPort(host, c.cfg.Port)
  229. numConns := 1
  230. //See if the host already has connections in the pool
  231. c.mu.Lock()
  232. conns, ok := c.connPool[addr]
  233. c.mu.Unlock()
  234. if ok {
  235. //if the host has enough connections just exit
  236. numConns = conns.Size()
  237. if numConns >= c.cfg.NumConns {
  238. continue
  239. }
  240. } else {
  241. //See if the host is reachable
  242. if err := c.connect(addr); err != nil {
  243. continue
  244. }
  245. }
  246. //This is reached if the host is responsive and needs more connections
  247. //Create connections for host synchronously to mitigate flooding the host.
  248. wg.Add(1)
  249. go func(a string, conns int) {
  250. defer wg.Done()
  251. for ; conns < c.cfg.NumConns; conns++ {
  252. c.connect(a)
  253. }
  254. }(addr, numConns)
  255. }
  256. c.hostMu.RUnlock()
  257. //Wait until we're finished connecting to each host before returning
  258. wg.Wait()
  259. }
  260. // Should only be called if c.mu is locked
  261. func (c *SimplePool) removeConnLocked(conn *Conn) {
  262. conn.Close()
  263. connPool := c.connPool[conn.addr]
  264. if connPool == nil {
  265. return
  266. }
  267. connPool.RemoveNode(conn)
  268. if connPool.Size() == 0 {
  269. c.hostPool.RemoveNode(connPool)
  270. delete(c.connPool, conn.addr)
  271. }
  272. delete(c.conns, conn)
  273. }
  274. func (c *SimplePool) removeConn(conn *Conn) {
  275. c.mu.Lock()
  276. defer c.mu.Unlock()
  277. c.removeConnLocked(conn)
  278. }
  279. //HandleError is called by a Connection object to report to the pool an error has occured.
  280. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  281. //top off the pool.
  282. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  283. if !closed {
  284. // ignore all non-fatal errors
  285. return
  286. }
  287. c.removeConn(conn)
  288. c.mu.Lock()
  289. poolClosed := c.quit
  290. c.mu.Unlock()
  291. if !poolClosed {
  292. go c.fillPool() // top off pool.
  293. }
  294. }
  295. //Pick selects a connection to be used by the query.
  296. func (c *SimplePool) Pick(qry *Query) *Conn {
  297. //Check if connections are available
  298. c.mu.Lock()
  299. conns := len(c.conns)
  300. c.mu.Unlock()
  301. if conns == 0 {
  302. //try to populate the pool before returning.
  303. c.fillPool()
  304. }
  305. return c.hostPool.Pick(qry)
  306. }
  307. //Size returns the number of connections currently active in the pool
  308. func (p *SimplePool) Size() int {
  309. p.mu.Lock()
  310. conns := len(p.conns)
  311. p.mu.Unlock()
  312. return conns
  313. }
  314. //Close kills the pool and all associated connections.
  315. func (c *SimplePool) Close() {
  316. c.quitOnce.Do(func() {
  317. c.mu.Lock()
  318. defer c.mu.Unlock()
  319. c.quit = true
  320. close(c.quitWait)
  321. for conn := range c.conns {
  322. c.removeConnLocked(conn)
  323. }
  324. })
  325. }
  326. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  327. c.hostMu.Lock()
  328. toRemove := make(map[string]struct{})
  329. for k := range c.hosts {
  330. toRemove[k] = struct{}{}
  331. }
  332. for _, host := range hosts {
  333. host := host
  334. delete(toRemove, host.Peer)
  335. // we already have it
  336. if _, ok := c.hosts[host.Peer]; ok {
  337. // TODO: Check rack, dc, token range is consistent, trigger topology change
  338. // update stored host
  339. continue
  340. }
  341. c.hosts[host.Peer] = &host
  342. }
  343. // can we hold c.mu whilst iterating this loop?
  344. for addr := range toRemove {
  345. c.removeHostLocked(addr)
  346. }
  347. c.hostMu.Unlock()
  348. c.fillPool()
  349. }
  350. func (c *SimplePool) removeHostLocked(addr string) {
  351. if _, ok := c.hosts[addr]; !ok {
  352. return
  353. }
  354. delete(c.hosts, addr)
  355. c.mu.Lock()
  356. defer c.mu.Unlock()
  357. if _, ok := c.connPool[addr]; !ok {
  358. return
  359. }
  360. for conn := range c.conns {
  361. if conn.Address() == addr {
  362. c.removeConnLocked(conn)
  363. }
  364. }
  365. }