connectionpool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  18. Purpose
  19. The connection pool in gocql opens and closes connections as well as selects an available connection
  20. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  21. are caught by the connection experiencing the error.
  22. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  23. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  24. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  25. Example of Single Connection Pool:
  26. type SingleConnection struct {
  27. conn *Conn
  28. cfg *ClusterConfig
  29. }
  30. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  31. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  32. connCfg := ConnConfig{
  33. ProtoVersion: cfg.ProtoVersion,
  34. CQLVersion: cfg.CQLVersion,
  35. Timeout: cfg.Timeout,
  36. NumStreams: cfg.NumStreams,
  37. Compressor: cfg.Compressor,
  38. Authenticator: cfg.Authenticator,
  39. Keepalive: cfg.SocketKeepalive,
  40. }
  41. pool := SingleConnection{cfg:cfg}
  42. pool.conn = Connect(addr,connCfg,pool)
  43. return &pool
  44. }
  45. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  46. if closed {
  47. connCfg := ConnConfig{
  48. ProtoVersion: cfg.ProtoVersion,
  49. CQLVersion: cfg.CQLVersion,
  50. Timeout: cfg.Timeout,
  51. NumStreams: cfg.NumStreams,
  52. Compressor: cfg.Compressor,
  53. Authenticator: cfg.Authenticator,
  54. Keepalive: cfg.SocketKeepalive,
  55. }
  56. s.conn = Connect(conn.Address(),connCfg,s)
  57. }
  58. }
  59. func (s *SingleConnection) Pick(qry *Query) *Conn {
  60. if s.conn.isClosed {
  61. return nil
  62. }
  63. return s.conn
  64. }
  65. func (s *SingleConnection) Size() int {
  66. return 1
  67. }
  68. func (s *SingleConnection) Close() {
  69. s.conn.Close()
  70. }
  71. This is a very simple example of a type that exposes the connection pool interface. To assign
  72. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  73. cluster := NewCluster("127.0.0.1")
  74. cluster.ConnPoolType = NewSingleConnection
  75. ...
  76. session, err := cluster.CreateSession()
  77. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  78. */
  79. type ConnectionPool interface {
  80. SetHosts
  81. Pick(*Query) *Conn
  82. Size() int
  83. Close()
  84. }
  85. // interface to implement to receive the host information
  86. type SetHosts interface {
  87. SetHosts(hosts []HostInfo)
  88. }
  89. // interface to implement to receive the partitioner value
  90. type SetPartitioner interface {
  91. SetPartitioner(partitioner string)
  92. }
  93. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  94. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  95. //SimplePool is the current implementation of the connection pool inside gocql. This
  96. //pool is meant to be a simple default used by gocql so users can get up and running
  97. //quickly.
  98. type SimplePool struct {
  99. cfg *ClusterConfig
  100. hostPool *RoundRobin
  101. connPool map[string]*RoundRobin
  102. conns map[*Conn]struct{}
  103. keyspace string
  104. hostMu sync.RWMutex
  105. // this is the set of current hosts which the pool will attempt to connect to
  106. hosts map[string]*HostInfo
  107. // protects hostpool, connPoll, conns, quit
  108. mu sync.Mutex
  109. cFillingPool chan int
  110. quit bool
  111. quitWait chan bool
  112. quitOnce sync.Once
  113. tlsConfig *tls.Config
  114. }
  115. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  116. certPool := x509.NewCertPool()
  117. // ca cert is optional
  118. if sslOpts.CaPath != "" {
  119. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  120. if err != nil {
  121. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  122. }
  123. if !certPool.AppendCertsFromPEM(pem) {
  124. return nil, errors.New("connectionpool: failed parsing or CA certs")
  125. }
  126. }
  127. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  128. if err != nil {
  129. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  130. }
  131. config := &tls.Config{
  132. Certificates: []tls.Certificate{mycert},
  133. RootCAs: certPool,
  134. }
  135. config.InsecureSkipVerify = !sslOpts.EnableHostVerification
  136. return config, nil
  137. }
  138. //NewSimplePool is the function used by gocql to create the simple connection pool.
  139. //This is the default if no other pool type is specified.
  140. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  141. pool := &SimplePool{
  142. cfg: cfg,
  143. hostPool: NewRoundRobin(),
  144. connPool: make(map[string]*RoundRobin),
  145. conns: make(map[*Conn]struct{}),
  146. quitWait: make(chan bool),
  147. cFillingPool: make(chan int, 1),
  148. keyspace: cfg.Keyspace,
  149. hosts: make(map[string]*HostInfo),
  150. }
  151. for _, host := range cfg.Hosts {
  152. // seed hosts have unknown topology
  153. // TODO: Handle populating this during SetHosts
  154. pool.hosts[host] = &HostInfo{Peer: host}
  155. }
  156. if cfg.SslOpts != nil {
  157. config, err := setupTLSConfig(cfg.SslOpts)
  158. if err != nil {
  159. return nil, err
  160. }
  161. pool.tlsConfig = config
  162. }
  163. //Walk through connecting to hosts. As soon as one host connects
  164. //defer the remaining connections to cluster.fillPool()
  165. for i := 0; i < len(cfg.Hosts); i++ {
  166. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  167. if pool.connect(addr) == nil {
  168. pool.cFillingPool <- 1
  169. go pool.fillPool()
  170. break
  171. }
  172. }
  173. return pool, nil
  174. }
  175. func (c *SimplePool) connect(addr string) error {
  176. cfg := ConnConfig{
  177. ProtoVersion: c.cfg.ProtoVersion,
  178. CQLVersion: c.cfg.CQLVersion,
  179. Timeout: c.cfg.Timeout,
  180. NumStreams: c.cfg.NumStreams,
  181. Compressor: c.cfg.Compressor,
  182. Authenticator: c.cfg.Authenticator,
  183. Keepalive: c.cfg.SocketKeepalive,
  184. tlsConfig: c.tlsConfig,
  185. }
  186. conn, err := Connect(addr, cfg, c)
  187. if err != nil {
  188. log.Printf("connect: failed to connect to %q: %v", addr, err)
  189. return err
  190. }
  191. return c.addConn(conn)
  192. }
  193. func (c *SimplePool) addConn(conn *Conn) error {
  194. c.mu.Lock()
  195. defer c.mu.Unlock()
  196. if c.quit {
  197. conn.Close()
  198. return nil
  199. }
  200. //Set the connection's keyspace if any before adding it to the pool
  201. if c.keyspace != "" {
  202. if err := conn.UseKeyspace(c.keyspace); err != nil {
  203. log.Printf("error setting connection keyspace. %v", err)
  204. conn.Close()
  205. return err
  206. }
  207. }
  208. connPool := c.connPool[conn.Address()]
  209. if connPool == nil {
  210. connPool = NewRoundRobin()
  211. c.connPool[conn.Address()] = connPool
  212. c.hostPool.AddNode(connPool)
  213. }
  214. connPool.AddNode(conn)
  215. c.conns[conn] = struct{}{}
  216. return nil
  217. }
  218. //fillPool manages the pool of connections making sure that each host has the correct
  219. //amount of connections defined. Also the method will test a host with one connection
  220. //instead of flooding the host with number of connections defined in the cluster config
  221. func (c *SimplePool) fillPool() {
  222. //Debounce large amounts of requests to fill pool
  223. select {
  224. case <-time.After(1 * time.Millisecond):
  225. return
  226. case <-c.cFillingPool:
  227. defer func() { c.cFillingPool <- 1 }()
  228. }
  229. c.mu.Lock()
  230. isClosed := c.quit
  231. c.mu.Unlock()
  232. //Exit if cluster(session) is closed
  233. if isClosed {
  234. return
  235. }
  236. c.hostMu.RLock()
  237. //Walk through list of defined hosts
  238. var wg sync.WaitGroup
  239. for host := range c.hosts {
  240. addr := JoinHostPort(host, c.cfg.Port)
  241. numConns := 1
  242. //See if the host already has connections in the pool
  243. c.mu.Lock()
  244. conns, ok := c.connPool[addr]
  245. c.mu.Unlock()
  246. if ok {
  247. //if the host has enough connections just exit
  248. numConns = conns.Size()
  249. if numConns >= c.cfg.NumConns {
  250. continue
  251. }
  252. } else {
  253. //See if the host is reachable
  254. if err := c.connect(addr); err != nil {
  255. continue
  256. }
  257. }
  258. //This is reached if the host is responsive and needs more connections
  259. //Create connections for host synchronously to mitigate flooding the host.
  260. wg.Add(1)
  261. go func(a string, conns int) {
  262. defer wg.Done()
  263. for ; conns < c.cfg.NumConns; conns++ {
  264. c.connect(a)
  265. }
  266. }(addr, numConns)
  267. }
  268. c.hostMu.RUnlock()
  269. //Wait until we're finished connecting to each host before returning
  270. wg.Wait()
  271. }
  272. // Should only be called if c.mu is locked
  273. func (c *SimplePool) removeConnLocked(conn *Conn) {
  274. conn.Close()
  275. connPool := c.connPool[conn.addr]
  276. if connPool == nil {
  277. return
  278. }
  279. connPool.RemoveNode(conn)
  280. if connPool.Size() == 0 {
  281. c.hostPool.RemoveNode(connPool)
  282. delete(c.connPool, conn.addr)
  283. }
  284. delete(c.conns, conn)
  285. }
  286. func (c *SimplePool) removeConn(conn *Conn) {
  287. c.mu.Lock()
  288. defer c.mu.Unlock()
  289. c.removeConnLocked(conn)
  290. }
  291. //HandleError is called by a Connection object to report to the pool an error has occured.
  292. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  293. //top off the pool.
  294. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  295. if !closed {
  296. // ignore all non-fatal errors
  297. return
  298. }
  299. c.removeConn(conn)
  300. c.mu.Lock()
  301. poolClosed := c.quit
  302. c.mu.Unlock()
  303. if !poolClosed {
  304. go c.fillPool() // top off pool.
  305. }
  306. }
  307. //Pick selects a connection to be used by the query.
  308. func (c *SimplePool) Pick(qry *Query) *Conn {
  309. //Check if connections are available
  310. c.mu.Lock()
  311. conns := len(c.conns)
  312. c.mu.Unlock()
  313. if conns == 0 {
  314. //try to populate the pool before returning.
  315. c.fillPool()
  316. }
  317. return c.hostPool.Pick(qry)
  318. }
  319. //Size returns the number of connections currently active in the pool
  320. func (p *SimplePool) Size() int {
  321. p.mu.Lock()
  322. conns := len(p.conns)
  323. p.mu.Unlock()
  324. return conns
  325. }
  326. //Close kills the pool and all associated connections.
  327. func (c *SimplePool) Close() {
  328. c.quitOnce.Do(func() {
  329. c.mu.Lock()
  330. defer c.mu.Unlock()
  331. c.quit = true
  332. close(c.quitWait)
  333. for conn := range c.conns {
  334. c.removeConnLocked(conn)
  335. }
  336. })
  337. }
  338. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  339. c.hostMu.Lock()
  340. toRemove := make(map[string]struct{})
  341. for k := range c.hosts {
  342. toRemove[k] = struct{}{}
  343. }
  344. for _, host := range hosts {
  345. host := host
  346. delete(toRemove, host.Peer)
  347. // we already have it
  348. if _, ok := c.hosts[host.Peer]; ok {
  349. // TODO: Check rack, dc, token range is consistent, trigger topology change
  350. // update stored host
  351. continue
  352. }
  353. c.hosts[host.Peer] = &host
  354. }
  355. // can we hold c.mu whilst iterating this loop?
  356. for addr := range toRemove {
  357. c.removeHostLocked(addr)
  358. }
  359. c.hostMu.Unlock()
  360. c.fillPool()
  361. }
  362. func (c *SimplePool) removeHostLocked(addr string) {
  363. if _, ok := c.hosts[addr]; !ok {
  364. return
  365. }
  366. delete(c.hosts, addr)
  367. c.mu.Lock()
  368. defer c.mu.Unlock()
  369. if _, ok := c.connPool[addr]; !ok {
  370. return
  371. }
  372. for conn := range c.conns {
  373. if conn.Address() == addr {
  374. c.removeConnLocked(conn)
  375. }
  376. }
  377. }
  378. //NewRoundRobinConnPool creates a connection pool which selects hosts by
  379. //round-robin, and then selects a connection for that host by round-robin.
  380. func NewRoundRobinConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  381. return NewPolicyConnPool(
  382. cfg,
  383. NewRoundRobinHostPolicy(),
  384. NewRoundRobinConnPolicy,
  385. )
  386. }
  387. //NewTokenAwareConnPool creates a connection pool which selects hosts by
  388. //a token aware policy, and then selects a connection for that host by
  389. //round-robin.
  390. func NewTokenAwareConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  391. return NewPolicyConnPool(
  392. cfg,
  393. NewTokenAwareHostPolicy(NewRoundRobinHostPolicy()),
  394. NewRoundRobinConnPolicy,
  395. )
  396. }
  397. type policyConnPool struct {
  398. port int
  399. numConns int
  400. connCfg ConnConfig
  401. keyspace string
  402. mu sync.RWMutex
  403. hostPolicy HostSelectionPolicy
  404. connPolicy func() ConnSelectionPolicy
  405. hostConnPools map[string]*hostConnPool
  406. }
  407. //Creates a policy based connection pool. This func isn't meant to be directly
  408. //used as a NewPoolFunc in ClusterConfig, instead a func should be created
  409. //which satisfies the NewPoolFunc type, which calls this func with the desired
  410. //hostPolicy and connPolicy; see NewRoundRobinConnPool or NewTokenAwareConnPool
  411. //for examples.
  412. func NewPolicyConnPool(
  413. cfg *ClusterConfig,
  414. hostPolicy HostSelectionPolicy,
  415. connPolicy func() ConnSelectionPolicy,
  416. ) (ConnectionPool, error) {
  417. var err error
  418. var tlsConfig *tls.Config
  419. if cfg.SslOpts != nil {
  420. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  421. if err != nil {
  422. return nil, err
  423. }
  424. }
  425. // create the pool
  426. pool := &policyConnPool{
  427. port: cfg.Port,
  428. numConns: cfg.NumConns,
  429. connCfg: ConnConfig{
  430. ProtoVersion: cfg.ProtoVersion,
  431. CQLVersion: cfg.CQLVersion,
  432. Timeout: cfg.Timeout,
  433. NumStreams: cfg.NumStreams,
  434. Compressor: cfg.Compressor,
  435. Authenticator: cfg.Authenticator,
  436. Keepalive: cfg.SocketKeepalive,
  437. tlsConfig: tlsConfig,
  438. },
  439. keyspace: cfg.Keyspace,
  440. hostPolicy: hostPolicy,
  441. connPolicy: connPolicy,
  442. hostConnPools: map[string]*hostConnPool{},
  443. }
  444. hosts := make([]HostInfo, len(cfg.Hosts))
  445. for i, hostAddr := range cfg.Hosts {
  446. hosts[i].Peer = hostAddr
  447. }
  448. pool.SetHosts(hosts)
  449. return pool, nil
  450. }
  451. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  452. p.mu.Lock()
  453. toRemove := make(map[string]struct{})
  454. for addr := range p.hostConnPools {
  455. toRemove[addr] = struct{}{}
  456. }
  457. // TODO connect to hosts in parallel, but wait for pools to be
  458. // created before returning
  459. for i := range hosts {
  460. pool, exists := p.hostConnPools[hosts[i].Peer]
  461. if !exists {
  462. // create a connection pool for the host
  463. pool = newHostConnPool(
  464. hosts[i].Peer,
  465. p.port,
  466. p.numConns,
  467. p.connCfg,
  468. p.keyspace,
  469. p.connPolicy(),
  470. )
  471. p.hostConnPools[hosts[i].Peer] = pool
  472. } else {
  473. // still have this host, so don't remove it
  474. delete(toRemove, hosts[i].Peer)
  475. }
  476. }
  477. for addr := range toRemove {
  478. pool := p.hostConnPools[addr]
  479. delete(p.hostConnPools, addr)
  480. pool.Close()
  481. }
  482. // update the policy
  483. p.hostPolicy.SetHosts(hosts)
  484. p.mu.Unlock()
  485. }
  486. func (p *policyConnPool) SetPartitioner(partitioner string) {
  487. p.hostPolicy.SetPartitioner(partitioner)
  488. }
  489. func (p *policyConnPool) Size() int {
  490. p.mu.RLock()
  491. count := 0
  492. for _, pool := range p.hostConnPools {
  493. count += pool.Size()
  494. }
  495. p.mu.RUnlock()
  496. return count
  497. }
  498. func (p *policyConnPool) Pick(qry *Query) *Conn {
  499. nextHost := p.hostPolicy.Pick(qry)
  500. p.mu.RLock()
  501. var host *HostInfo
  502. var conn *Conn
  503. for conn == nil {
  504. host = nextHost()
  505. if host == nil {
  506. break
  507. }
  508. conn = p.hostConnPools[host.Peer].Pick(qry)
  509. }
  510. p.mu.RUnlock()
  511. return conn
  512. }
  513. func (p *policyConnPool) Close() {
  514. p.mu.Lock()
  515. // remove the hosts from the policy
  516. p.hostPolicy.SetHosts([]HostInfo{})
  517. // close the pools
  518. for addr, pool := range p.hostConnPools {
  519. delete(p.hostConnPools, addr)
  520. pool.Close()
  521. }
  522. p.mu.Unlock()
  523. }
  524. // hostConnPool is a connection pool for a single host.
  525. // Connection selection is based on a provided ConnSelectionPolicy
  526. type hostConnPool struct {
  527. host string
  528. port int
  529. addr string
  530. size int
  531. connCfg ConnConfig
  532. keyspace string
  533. policy ConnSelectionPolicy
  534. // protection for conns, closed, filling
  535. mu sync.RWMutex
  536. conns []*Conn
  537. closed bool
  538. filling bool
  539. }
  540. func newHostConnPool(
  541. host string,
  542. port int,
  543. size int,
  544. connCfg ConnConfig,
  545. keyspace string,
  546. policy ConnSelectionPolicy,
  547. ) *hostConnPool {
  548. pool := &hostConnPool{
  549. host: host,
  550. port: port,
  551. addr: JoinHostPort(host, port),
  552. size: size,
  553. connCfg: connCfg,
  554. keyspace: keyspace,
  555. policy: policy,
  556. conns: make([]*Conn, 0, size),
  557. filling: false,
  558. closed: false,
  559. }
  560. // fill the pool with the initial connections before returning
  561. pool.fill()
  562. return pool
  563. }
  564. // Pick a connection from this connection pool for the given query.
  565. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  566. pool.mu.RLock()
  567. if pool.closed {
  568. pool.mu.RUnlock()
  569. return nil
  570. }
  571. empty := len(pool.conns) == 0
  572. pool.mu.RUnlock()
  573. if empty {
  574. // try to fill the empty pool
  575. go pool.fill()
  576. return nil
  577. }
  578. return pool.policy.Pick(qry)
  579. }
  580. //Size returns the number of connections currently active in the pool
  581. func (pool *hostConnPool) Size() int {
  582. pool.mu.RLock()
  583. defer pool.mu.RUnlock()
  584. return len(pool.conns)
  585. }
  586. //Close the connection pool
  587. func (pool *hostConnPool) Close() {
  588. pool.mu.Lock()
  589. defer pool.mu.Unlock()
  590. if pool.closed {
  591. return
  592. }
  593. pool.closed = true
  594. // drain, but don't wait
  595. go pool.drain()
  596. }
  597. // Fill the connection pool
  598. func (pool *hostConnPool) fill() {
  599. pool.mu.RLock()
  600. // avoid filling a closed pool, or concurrent filling
  601. if pool.closed || pool.filling {
  602. pool.mu.RUnlock()
  603. return
  604. }
  605. // determine the filling work to be done
  606. startCount := len(pool.conns)
  607. fillCount := pool.size - startCount
  608. // avoid filling a full (or overfull) pool
  609. if fillCount <= 0 {
  610. pool.mu.RUnlock()
  611. return
  612. }
  613. // switch from read to write lock
  614. pool.mu.RUnlock()
  615. pool.mu.Lock()
  616. // double check everything since the lock was released
  617. startCount = len(pool.conns)
  618. fillCount = pool.size - startCount
  619. if pool.closed || pool.filling || fillCount <= 0 {
  620. // looks like another goroutine already beat this
  621. // goroutine to the filling
  622. pool.mu.Unlock()
  623. return
  624. }
  625. // ok fill the pool
  626. pool.filling = true
  627. // allow others to access the pool while filling
  628. pool.mu.Unlock()
  629. // only this goroutine should make calls to fill/empty the pool at this
  630. // point until after this routine or its subordinates calls
  631. // fillingStopped
  632. // fill only the first connection synchronously
  633. if startCount == 0 {
  634. err := pool.connect()
  635. pool.logConnectErr(err)
  636. if err != nil {
  637. // probably unreachable host
  638. go pool.fillingStopped()
  639. return
  640. }
  641. // filled one
  642. fillCount--
  643. // connect all connections to this host in sync
  644. for fillCount > 0 {
  645. err := pool.connect()
  646. pool.logConnectErr(err)
  647. // decrement, even on error
  648. fillCount--
  649. }
  650. go pool.fillingStopped()
  651. return
  652. }
  653. // fill the rest of the pool asynchronously
  654. go func() {
  655. for fillCount > 0 {
  656. err := pool.connect()
  657. pool.logConnectErr(err)
  658. // decrement, even on error
  659. fillCount--
  660. }
  661. // mark the end of filling
  662. pool.fillingStopped()
  663. }()
  664. }
  665. func (pool *hostConnPool) logConnectErr(err error) {
  666. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  667. // connection refused
  668. // these are typical during a node outage so avoid log spam.
  669. } else if err != nil {
  670. // unexpected error
  671. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  672. }
  673. }
  674. // transition back to a not-filling state.
  675. func (pool *hostConnPool) fillingStopped() {
  676. // wait for some time to avoid back-to-back filling
  677. // this provides some time between failed attempts
  678. // to fill the pool for the host to recover
  679. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  680. pool.mu.Lock()
  681. pool.filling = false
  682. pool.mu.Unlock()
  683. }
  684. // create a new connection to the host and add it to the pool
  685. func (pool *hostConnPool) connect() error {
  686. // try to connect
  687. conn, err := Connect(pool.addr, pool.connCfg, pool)
  688. if err != nil {
  689. return err
  690. }
  691. if pool.keyspace != "" {
  692. // set the keyspace
  693. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  694. conn.Close()
  695. return err
  696. }
  697. }
  698. // add the Conn to the pool
  699. pool.mu.Lock()
  700. defer pool.mu.Unlock()
  701. if pool.closed {
  702. conn.Close()
  703. return nil
  704. }
  705. pool.conns = append(pool.conns, conn)
  706. pool.policy.SetConns(pool.conns)
  707. return nil
  708. }
  709. // handle any error from a Conn
  710. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  711. if !closed {
  712. // still an open connection, so continue using it
  713. return
  714. }
  715. pool.mu.Lock()
  716. defer pool.mu.Unlock()
  717. if pool.closed {
  718. // pool closed
  719. return
  720. }
  721. // find the connection index
  722. for i, candidate := range pool.conns {
  723. if candidate == conn {
  724. // remove the connection, not preserving order
  725. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  726. // update the policy
  727. pool.policy.SetConns(pool.conns)
  728. // lost a connection, so fill the pool
  729. go pool.fill()
  730. break
  731. }
  732. }
  733. }
  734. // removes and closes all connections from the pool
  735. func (pool *hostConnPool) drain() {
  736. pool.mu.Lock()
  737. defer pool.mu.Unlock()
  738. // empty the pool
  739. conns := pool.conns
  740. pool.conns = pool.conns[:0]
  741. // update the policy
  742. pool.policy.SetConns(pool.conns)
  743. // close the connections
  744. for _, conn := range conns {
  745. conn.Close()
  746. }
  747. }