connectionpool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  18. Purpose
  19. The connection pool in gocql opens and closes connections as well as selects an available connection
  20. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  21. are caught by the connection experiencing the error.
  22. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  23. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  24. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  25. Example of Single Connection Pool:
  26. type SingleConnection struct {
  27. conn *Conn
  28. cfg *ClusterConfig
  29. }
  30. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  31. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  32. connCfg := ConnConfig{
  33. ProtoVersion: cfg.ProtoVersion,
  34. CQLVersion: cfg.CQLVersion,
  35. Timeout: cfg.Timeout,
  36. NumStreams: cfg.NumStreams,
  37. Compressor: cfg.Compressor,
  38. Authenticator: cfg.Authenticator,
  39. Keepalive: cfg.SocketKeepalive,
  40. }
  41. pool := SingleConnection{cfg:cfg}
  42. pool.conn = Connect(addr,connCfg,pool)
  43. return &pool
  44. }
  45. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  46. if closed {
  47. connCfg := ConnConfig{
  48. ProtoVersion: cfg.ProtoVersion,
  49. CQLVersion: cfg.CQLVersion,
  50. Timeout: cfg.Timeout,
  51. NumStreams: cfg.NumStreams,
  52. Compressor: cfg.Compressor,
  53. Authenticator: cfg.Authenticator,
  54. Keepalive: cfg.SocketKeepalive,
  55. }
  56. s.conn = Connect(conn.Address(),connCfg,s)
  57. }
  58. }
  59. func (s *SingleConnection) Pick(qry *Query) *Conn {
  60. if s.conn.isClosed {
  61. return nil
  62. }
  63. return s.conn
  64. }
  65. func (s *SingleConnection) Size() int {
  66. return 1
  67. }
  68. func (s *SingleConnection) Close() {
  69. s.conn.Close()
  70. }
  71. This is a very simple example of a type that exposes the connection pool interface. To assign
  72. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  73. cluster := NewCluster("127.0.0.1")
  74. cluster.ConnPoolType = NewSingleConnection
  75. ...
  76. session, err := cluster.CreateSession()
  77. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  78. */
  79. type ConnectionPool interface {
  80. SetHosts
  81. Pick(*Query) *Conn
  82. Size() int
  83. Close()
  84. }
  85. // interface to implement to receive the host information
  86. type SetHosts interface {
  87. SetHosts(hosts []HostInfo)
  88. }
  89. // interface to implement to receive the partitioner value
  90. type SetPartitioner interface {
  91. SetPartitioner(partitioner string)
  92. }
  93. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  94. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  95. //SimplePool is the current implementation of the connection pool inside gocql. This
  96. //pool is meant to be a simple default used by gocql so users can get up and running
  97. //quickly.
  98. type SimplePool struct {
  99. cfg *ClusterConfig
  100. hostPool *RoundRobin
  101. connPool map[string]*RoundRobin
  102. conns map[*Conn]struct{}
  103. keyspace string
  104. hostMu sync.RWMutex
  105. // this is the set of current hosts which the pool will attempt to connect to
  106. hosts map[string]*HostInfo
  107. // protects hostpool, connPoll, conns, quit
  108. mu sync.Mutex
  109. cFillingPool chan int
  110. quit bool
  111. quitWait chan bool
  112. quitOnce sync.Once
  113. tlsConfig *tls.Config
  114. }
  115. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  116. certPool := x509.NewCertPool()
  117. // ca cert is optional
  118. if sslOpts.CaPath != "" {
  119. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  120. if err != nil {
  121. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  122. }
  123. if !certPool.AppendCertsFromPEM(pem) {
  124. return nil, errors.New("connectionpool: failed parsing or CA certs")
  125. }
  126. }
  127. mycerts := make([]tls.Certificate, 0)
  128. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  129. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  130. if err != nil {
  131. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  132. }
  133. mycerts = append(mycerts, mycert)
  134. }
  135. config := &tls.Config{
  136. Certificates: mycerts,
  137. RootCAs: certPool,
  138. }
  139. config.InsecureSkipVerify = !sslOpts.EnableHostVerification
  140. return config, nil
  141. }
  142. //NewSimplePool is the function used by gocql to create the simple connection pool.
  143. //This is the default if no other pool type is specified.
  144. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  145. pool := &SimplePool{
  146. cfg: cfg,
  147. hostPool: NewRoundRobin(),
  148. connPool: make(map[string]*RoundRobin),
  149. conns: make(map[*Conn]struct{}),
  150. quitWait: make(chan bool),
  151. cFillingPool: make(chan int, 1),
  152. keyspace: cfg.Keyspace,
  153. hosts: make(map[string]*HostInfo),
  154. }
  155. for _, host := range cfg.Hosts {
  156. // seed hosts have unknown topology
  157. // TODO: Handle populating this during SetHosts
  158. pool.hosts[host] = &HostInfo{Peer: host}
  159. }
  160. if cfg.SslOpts != nil {
  161. config, err := setupTLSConfig(cfg.SslOpts)
  162. if err != nil {
  163. return nil, err
  164. }
  165. pool.tlsConfig = config
  166. }
  167. //Walk through connecting to hosts. As soon as one host connects
  168. //defer the remaining connections to cluster.fillPool()
  169. for i := 0; i < len(cfg.Hosts); i++ {
  170. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  171. if pool.connect(addr) == nil {
  172. pool.cFillingPool <- 1
  173. go pool.fillPool()
  174. break
  175. }
  176. }
  177. return pool, nil
  178. }
  179. func (c *SimplePool) connect(addr string) error {
  180. cfg := ConnConfig{
  181. ProtoVersion: c.cfg.ProtoVersion,
  182. CQLVersion: c.cfg.CQLVersion,
  183. Timeout: c.cfg.Timeout,
  184. NumStreams: c.cfg.NumStreams,
  185. Compressor: c.cfg.Compressor,
  186. Authenticator: c.cfg.Authenticator,
  187. Keepalive: c.cfg.SocketKeepalive,
  188. tlsConfig: c.tlsConfig,
  189. }
  190. conn, err := Connect(addr, cfg, c)
  191. if err != nil {
  192. log.Printf("connect: failed to connect to %q: %v", addr, err)
  193. return err
  194. }
  195. return c.addConn(conn)
  196. }
  197. func (c *SimplePool) addConn(conn *Conn) error {
  198. c.mu.Lock()
  199. defer c.mu.Unlock()
  200. if c.quit {
  201. conn.Close()
  202. return nil
  203. }
  204. //Set the connection's keyspace if any before adding it to the pool
  205. if c.keyspace != "" {
  206. if err := conn.UseKeyspace(c.keyspace); err != nil {
  207. log.Printf("error setting connection keyspace. %v", err)
  208. conn.Close()
  209. return err
  210. }
  211. }
  212. connPool := c.connPool[conn.Address()]
  213. if connPool == nil {
  214. connPool = NewRoundRobin()
  215. c.connPool[conn.Address()] = connPool
  216. c.hostPool.AddNode(connPool)
  217. }
  218. connPool.AddNode(conn)
  219. c.conns[conn] = struct{}{}
  220. return nil
  221. }
  222. //fillPool manages the pool of connections making sure that each host has the correct
  223. //amount of connections defined. Also the method will test a host with one connection
  224. //instead of flooding the host with number of connections defined in the cluster config
  225. func (c *SimplePool) fillPool() {
  226. //Debounce large amounts of requests to fill pool
  227. select {
  228. case <-time.After(1 * time.Millisecond):
  229. return
  230. case <-c.cFillingPool:
  231. defer func() { c.cFillingPool <- 1 }()
  232. }
  233. c.mu.Lock()
  234. isClosed := c.quit
  235. c.mu.Unlock()
  236. //Exit if cluster(session) is closed
  237. if isClosed {
  238. return
  239. }
  240. c.hostMu.RLock()
  241. //Walk through list of defined hosts
  242. var wg sync.WaitGroup
  243. for host := range c.hosts {
  244. addr := JoinHostPort(host, c.cfg.Port)
  245. numConns := 1
  246. //See if the host already has connections in the pool
  247. c.mu.Lock()
  248. conns, ok := c.connPool[addr]
  249. c.mu.Unlock()
  250. if ok {
  251. //if the host has enough connections just exit
  252. numConns = conns.Size()
  253. if numConns >= c.cfg.NumConns {
  254. continue
  255. }
  256. } else {
  257. //See if the host is reachable
  258. if err := c.connect(addr); err != nil {
  259. continue
  260. }
  261. }
  262. //This is reached if the host is responsive and needs more connections
  263. //Create connections for host synchronously to mitigate flooding the host.
  264. wg.Add(1)
  265. go func(a string, conns int) {
  266. defer wg.Done()
  267. for ; conns < c.cfg.NumConns; conns++ {
  268. c.connect(a)
  269. }
  270. }(addr, numConns)
  271. }
  272. c.hostMu.RUnlock()
  273. //Wait until we're finished connecting to each host before returning
  274. wg.Wait()
  275. }
  276. // Should only be called if c.mu is locked
  277. func (c *SimplePool) removeConnLocked(conn *Conn) {
  278. conn.Close()
  279. connPool := c.connPool[conn.addr]
  280. if connPool == nil {
  281. return
  282. }
  283. connPool.RemoveNode(conn)
  284. if connPool.Size() == 0 {
  285. c.hostPool.RemoveNode(connPool)
  286. delete(c.connPool, conn.addr)
  287. }
  288. delete(c.conns, conn)
  289. }
  290. func (c *SimplePool) removeConn(conn *Conn) {
  291. c.mu.Lock()
  292. defer c.mu.Unlock()
  293. c.removeConnLocked(conn)
  294. }
  295. //HandleError is called by a Connection object to report to the pool an error has occured.
  296. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  297. //top off the pool.
  298. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  299. if !closed {
  300. // ignore all non-fatal errors
  301. return
  302. }
  303. c.removeConn(conn)
  304. c.mu.Lock()
  305. poolClosed := c.quit
  306. c.mu.Unlock()
  307. if !poolClosed {
  308. go c.fillPool() // top off pool.
  309. }
  310. }
  311. //Pick selects a connection to be used by the query.
  312. func (c *SimplePool) Pick(qry *Query) *Conn {
  313. //Check if connections are available
  314. c.mu.Lock()
  315. conns := len(c.conns)
  316. c.mu.Unlock()
  317. if conns == 0 {
  318. //try to populate the pool before returning.
  319. c.fillPool()
  320. }
  321. return c.hostPool.Pick(qry)
  322. }
  323. //Size returns the number of connections currently active in the pool
  324. func (p *SimplePool) Size() int {
  325. p.mu.Lock()
  326. conns := len(p.conns)
  327. p.mu.Unlock()
  328. return conns
  329. }
  330. //Close kills the pool and all associated connections.
  331. func (c *SimplePool) Close() {
  332. c.quitOnce.Do(func() {
  333. c.mu.Lock()
  334. defer c.mu.Unlock()
  335. c.quit = true
  336. close(c.quitWait)
  337. for conn := range c.conns {
  338. c.removeConnLocked(conn)
  339. }
  340. })
  341. }
  342. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  343. c.hostMu.Lock()
  344. toRemove := make(map[string]struct{})
  345. for k := range c.hosts {
  346. toRemove[k] = struct{}{}
  347. }
  348. for _, host := range hosts {
  349. host := host
  350. delete(toRemove, host.Peer)
  351. // we already have it
  352. if _, ok := c.hosts[host.Peer]; ok {
  353. // TODO: Check rack, dc, token range is consistent, trigger topology change
  354. // update stored host
  355. continue
  356. }
  357. c.hosts[host.Peer] = &host
  358. }
  359. // can we hold c.mu whilst iterating this loop?
  360. for addr := range toRemove {
  361. c.removeHostLocked(addr)
  362. }
  363. c.hostMu.Unlock()
  364. c.fillPool()
  365. }
  366. func (c *SimplePool) removeHostLocked(addr string) {
  367. if _, ok := c.hosts[addr]; !ok {
  368. return
  369. }
  370. delete(c.hosts, addr)
  371. c.mu.Lock()
  372. defer c.mu.Unlock()
  373. if _, ok := c.connPool[addr]; !ok {
  374. return
  375. }
  376. for conn := range c.conns {
  377. if conn.Address() == addr {
  378. c.removeConnLocked(conn)
  379. }
  380. }
  381. }
  382. //NewRoundRobinConnPool creates a connection pool which selects hosts by
  383. //round-robin, and then selects a connection for that host by round-robin.
  384. func NewRoundRobinConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  385. return NewPolicyConnPool(
  386. cfg,
  387. NewRoundRobinHostPolicy(),
  388. NewRoundRobinConnPolicy,
  389. )
  390. }
  391. //NewTokenAwareConnPool creates a connection pool which selects hosts by
  392. //a token aware policy, and then selects a connection for that host by
  393. //round-robin.
  394. func NewTokenAwareConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  395. return NewPolicyConnPool(
  396. cfg,
  397. NewTokenAwareHostPolicy(NewRoundRobinHostPolicy()),
  398. NewRoundRobinConnPolicy,
  399. )
  400. }
  401. type policyConnPool struct {
  402. port int
  403. numConns int
  404. connCfg ConnConfig
  405. keyspace string
  406. mu sync.RWMutex
  407. hostPolicy HostSelectionPolicy
  408. connPolicy func() ConnSelectionPolicy
  409. hostConnPools map[string]*hostConnPool
  410. }
  411. //Creates a policy based connection pool. This func isn't meant to be directly
  412. //used as a NewPoolFunc in ClusterConfig, instead a func should be created
  413. //which satisfies the NewPoolFunc type, which calls this func with the desired
  414. //hostPolicy and connPolicy; see NewRoundRobinConnPool or NewTokenAwareConnPool
  415. //for examples.
  416. func NewPolicyConnPool(
  417. cfg *ClusterConfig,
  418. hostPolicy HostSelectionPolicy,
  419. connPolicy func() ConnSelectionPolicy,
  420. ) (ConnectionPool, error) {
  421. var err error
  422. var tlsConfig *tls.Config
  423. if cfg.SslOpts != nil {
  424. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  425. if err != nil {
  426. return nil, err
  427. }
  428. }
  429. // create the pool
  430. pool := &policyConnPool{
  431. port: cfg.Port,
  432. numConns: cfg.NumConns,
  433. connCfg: ConnConfig{
  434. ProtoVersion: cfg.ProtoVersion,
  435. CQLVersion: cfg.CQLVersion,
  436. Timeout: cfg.Timeout,
  437. NumStreams: cfg.NumStreams,
  438. Compressor: cfg.Compressor,
  439. Authenticator: cfg.Authenticator,
  440. Keepalive: cfg.SocketKeepalive,
  441. tlsConfig: tlsConfig,
  442. },
  443. keyspace: cfg.Keyspace,
  444. hostPolicy: hostPolicy,
  445. connPolicy: connPolicy,
  446. hostConnPools: map[string]*hostConnPool{},
  447. }
  448. hosts := make([]HostInfo, len(cfg.Hosts))
  449. for i, hostAddr := range cfg.Hosts {
  450. hosts[i].Peer = hostAddr
  451. }
  452. pool.SetHosts(hosts)
  453. return pool, nil
  454. }
  455. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  456. p.mu.Lock()
  457. toRemove := make(map[string]struct{})
  458. for addr := range p.hostConnPools {
  459. toRemove[addr] = struct{}{}
  460. }
  461. // TODO connect to hosts in parallel, but wait for pools to be
  462. // created before returning
  463. for i := range hosts {
  464. pool, exists := p.hostConnPools[hosts[i].Peer]
  465. if !exists {
  466. // create a connection pool for the host
  467. pool = newHostConnPool(
  468. hosts[i].Peer,
  469. p.port,
  470. p.numConns,
  471. p.connCfg,
  472. p.keyspace,
  473. p.connPolicy(),
  474. )
  475. p.hostConnPools[hosts[i].Peer] = pool
  476. } else {
  477. // still have this host, so don't remove it
  478. delete(toRemove, hosts[i].Peer)
  479. }
  480. }
  481. for addr := range toRemove {
  482. pool := p.hostConnPools[addr]
  483. delete(p.hostConnPools, addr)
  484. pool.Close()
  485. }
  486. // update the policy
  487. p.hostPolicy.SetHosts(hosts)
  488. p.mu.Unlock()
  489. }
  490. func (p *policyConnPool) SetPartitioner(partitioner string) {
  491. p.hostPolicy.SetPartitioner(partitioner)
  492. }
  493. func (p *policyConnPool) Size() int {
  494. p.mu.RLock()
  495. count := 0
  496. for _, pool := range p.hostConnPools {
  497. count += pool.Size()
  498. }
  499. p.mu.RUnlock()
  500. return count
  501. }
  502. func (p *policyConnPool) Pick(qry *Query) *Conn {
  503. nextHost := p.hostPolicy.Pick(qry)
  504. p.mu.RLock()
  505. var host *HostInfo
  506. var conn *Conn
  507. for conn == nil {
  508. host = nextHost()
  509. if host == nil {
  510. break
  511. }
  512. conn = p.hostConnPools[host.Peer].Pick(qry)
  513. }
  514. p.mu.RUnlock()
  515. return conn
  516. }
  517. func (p *policyConnPool) Close() {
  518. p.mu.Lock()
  519. // remove the hosts from the policy
  520. p.hostPolicy.SetHosts([]HostInfo{})
  521. // close the pools
  522. for addr, pool := range p.hostConnPools {
  523. delete(p.hostConnPools, addr)
  524. pool.Close()
  525. }
  526. p.mu.Unlock()
  527. }
  528. // hostConnPool is a connection pool for a single host.
  529. // Connection selection is based on a provided ConnSelectionPolicy
  530. type hostConnPool struct {
  531. host string
  532. port int
  533. addr string
  534. size int
  535. connCfg ConnConfig
  536. keyspace string
  537. policy ConnSelectionPolicy
  538. // protection for conns, closed, filling
  539. mu sync.RWMutex
  540. conns []*Conn
  541. closed bool
  542. filling bool
  543. }
  544. func newHostConnPool(
  545. host string,
  546. port int,
  547. size int,
  548. connCfg ConnConfig,
  549. keyspace string,
  550. policy ConnSelectionPolicy,
  551. ) *hostConnPool {
  552. pool := &hostConnPool{
  553. host: host,
  554. port: port,
  555. addr: JoinHostPort(host, port),
  556. size: size,
  557. connCfg: connCfg,
  558. keyspace: keyspace,
  559. policy: policy,
  560. conns: make([]*Conn, 0, size),
  561. filling: false,
  562. closed: false,
  563. }
  564. // fill the pool with the initial connections before returning
  565. pool.fill()
  566. return pool
  567. }
  568. // Pick a connection from this connection pool for the given query.
  569. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  570. pool.mu.RLock()
  571. if pool.closed {
  572. pool.mu.RUnlock()
  573. return nil
  574. }
  575. empty := len(pool.conns) == 0
  576. pool.mu.RUnlock()
  577. if empty {
  578. // try to fill the empty pool
  579. go pool.fill()
  580. return nil
  581. }
  582. return pool.policy.Pick(qry)
  583. }
  584. //Size returns the number of connections currently active in the pool
  585. func (pool *hostConnPool) Size() int {
  586. pool.mu.RLock()
  587. defer pool.mu.RUnlock()
  588. return len(pool.conns)
  589. }
  590. //Close the connection pool
  591. func (pool *hostConnPool) Close() {
  592. pool.mu.Lock()
  593. defer pool.mu.Unlock()
  594. if pool.closed {
  595. return
  596. }
  597. pool.closed = true
  598. // drain, but don't wait
  599. go pool.drain()
  600. }
  601. // Fill the connection pool
  602. func (pool *hostConnPool) fill() {
  603. pool.mu.RLock()
  604. // avoid filling a closed pool, or concurrent filling
  605. if pool.closed || pool.filling {
  606. pool.mu.RUnlock()
  607. return
  608. }
  609. // determine the filling work to be done
  610. startCount := len(pool.conns)
  611. fillCount := pool.size - startCount
  612. // avoid filling a full (or overfull) pool
  613. if fillCount <= 0 {
  614. pool.mu.RUnlock()
  615. return
  616. }
  617. // switch from read to write lock
  618. pool.mu.RUnlock()
  619. pool.mu.Lock()
  620. // double check everything since the lock was released
  621. startCount = len(pool.conns)
  622. fillCount = pool.size - startCount
  623. if pool.closed || pool.filling || fillCount <= 0 {
  624. // looks like another goroutine already beat this
  625. // goroutine to the filling
  626. pool.mu.Unlock()
  627. return
  628. }
  629. // ok fill the pool
  630. pool.filling = true
  631. // allow others to access the pool while filling
  632. pool.mu.Unlock()
  633. // only this goroutine should make calls to fill/empty the pool at this
  634. // point until after this routine or its subordinates calls
  635. // fillingStopped
  636. // fill only the first connection synchronously
  637. if startCount == 0 {
  638. err := pool.connect()
  639. pool.logConnectErr(err)
  640. if err != nil {
  641. // probably unreachable host
  642. go pool.fillingStopped()
  643. return
  644. }
  645. // filled one
  646. fillCount--
  647. // connect all connections to this host in sync
  648. for fillCount > 0 {
  649. err := pool.connect()
  650. pool.logConnectErr(err)
  651. // decrement, even on error
  652. fillCount--
  653. }
  654. go pool.fillingStopped()
  655. return
  656. }
  657. // fill the rest of the pool asynchronously
  658. go func() {
  659. for fillCount > 0 {
  660. err := pool.connect()
  661. pool.logConnectErr(err)
  662. // decrement, even on error
  663. fillCount--
  664. }
  665. // mark the end of filling
  666. pool.fillingStopped()
  667. }()
  668. }
  669. func (pool *hostConnPool) logConnectErr(err error) {
  670. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  671. // connection refused
  672. // these are typical during a node outage so avoid log spam.
  673. } else if err != nil {
  674. // unexpected error
  675. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  676. }
  677. }
  678. // transition back to a not-filling state.
  679. func (pool *hostConnPool) fillingStopped() {
  680. // wait for some time to avoid back-to-back filling
  681. // this provides some time between failed attempts
  682. // to fill the pool for the host to recover
  683. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  684. pool.mu.Lock()
  685. pool.filling = false
  686. pool.mu.Unlock()
  687. }
  688. // create a new connection to the host and add it to the pool
  689. func (pool *hostConnPool) connect() error {
  690. // try to connect
  691. conn, err := Connect(pool.addr, pool.connCfg, pool)
  692. if err != nil {
  693. return err
  694. }
  695. if pool.keyspace != "" {
  696. // set the keyspace
  697. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  698. conn.Close()
  699. return err
  700. }
  701. }
  702. // add the Conn to the pool
  703. pool.mu.Lock()
  704. defer pool.mu.Unlock()
  705. if pool.closed {
  706. conn.Close()
  707. return nil
  708. }
  709. pool.conns = append(pool.conns, conn)
  710. pool.policy.SetConns(pool.conns)
  711. return nil
  712. }
  713. // handle any error from a Conn
  714. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  715. if !closed {
  716. // still an open connection, so continue using it
  717. return
  718. }
  719. pool.mu.Lock()
  720. defer pool.mu.Unlock()
  721. if pool.closed {
  722. // pool closed
  723. return
  724. }
  725. // find the connection index
  726. for i, candidate := range pool.conns {
  727. if candidate == conn {
  728. // remove the connection, not preserving order
  729. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  730. // update the policy
  731. pool.policy.SetConns(pool.conns)
  732. // lost a connection, so fill the pool
  733. go pool.fill()
  734. break
  735. }
  736. }
  737. }
  738. // removes and closes all connections from the pool
  739. func (pool *hostConnPool) drain() {
  740. pool.mu.Lock()
  741. defer pool.mu.Unlock()
  742. // empty the pool
  743. conns := pool.conns
  744. pool.conns = pool.conns[:0]
  745. // update the policy
  746. pool.policy.SetConns(pool.conns)
  747. // close the connections
  748. for _, conn := range conns {
  749. conn.Close()
  750. }
  751. }