connectionpool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  18. Purpose
  19. The connection pool in gocql opens and closes connections as well as selects an available connection
  20. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  21. are caught by the connection experiencing the error.
  22. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  23. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  24. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  25. Example of Single Connection Pool:
  26. type SingleConnection struct {
  27. conn *Conn
  28. cfg *ClusterConfig
  29. }
  30. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  31. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  32. connCfg := ConnConfig{
  33. ProtoVersion: cfg.ProtoVersion,
  34. CQLVersion: cfg.CQLVersion,
  35. Timeout: cfg.Timeout,
  36. NumStreams: cfg.NumStreams,
  37. Compressor: cfg.Compressor,
  38. Authenticator: cfg.Authenticator,
  39. Keepalive: cfg.SocketKeepalive,
  40. }
  41. pool := SingleConnection{cfg:cfg}
  42. pool.conn = Connect(addr,connCfg,pool)
  43. return &pool
  44. }
  45. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  46. if closed {
  47. connCfg := ConnConfig{
  48. ProtoVersion: cfg.ProtoVersion,
  49. CQLVersion: cfg.CQLVersion,
  50. Timeout: cfg.Timeout,
  51. NumStreams: cfg.NumStreams,
  52. Compressor: cfg.Compressor,
  53. Authenticator: cfg.Authenticator,
  54. Keepalive: cfg.SocketKeepalive,
  55. }
  56. s.conn = Connect(conn.Address(),connCfg,s)
  57. }
  58. }
  59. func (s *SingleConnection) Pick(qry *Query) *Conn {
  60. if s.conn.isClosed {
  61. return nil
  62. }
  63. return s.conn
  64. }
  65. func (s *SingleConnection) Size() int {
  66. return 1
  67. }
  68. func (s *SingleConnection) Close() {
  69. s.conn.Close()
  70. }
  71. This is a very simple example of a type that exposes the connection pool interface. To assign
  72. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  73. cluster := NewCluster("127.0.0.1")
  74. cluster.ConnPoolType = NewSingleConnection
  75. ...
  76. session, err := cluster.CreateSession()
  77. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  78. */
  79. type ConnectionPool interface {
  80. SetHosts
  81. Pick(*Query) *Conn
  82. Size() int
  83. Close()
  84. }
  85. // interface to implement to receive the host information
  86. type SetHosts interface {
  87. SetHosts(hosts []HostInfo)
  88. }
  89. // interface to implement to receive the partitioner value
  90. type SetPartitioner interface {
  91. SetPartitioner(partitioner string)
  92. }
  93. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  94. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  95. //SimplePool is the current implementation of the connection pool inside gocql. This
  96. //pool is meant to be a simple default used by gocql so users can get up and running
  97. //quickly.
  98. type SimplePool struct {
  99. cfg *ClusterConfig
  100. hostPool *RoundRobin
  101. connPool map[string]*RoundRobin
  102. conns map[*Conn]struct{}
  103. keyspace string
  104. hostMu sync.RWMutex
  105. // this is the set of current hosts which the pool will attempt to connect to
  106. hosts map[string]*HostInfo
  107. // protects hostpool, connPoll, conns, quit
  108. mu sync.Mutex
  109. cFillingPool chan int
  110. quit bool
  111. quitWait chan bool
  112. quitOnce sync.Once
  113. tlsConfig *tls.Config
  114. }
  115. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  116. // ca cert is optional
  117. if sslOpts.CaPath != "" {
  118. if sslOpts.RootCAs == nil {
  119. sslOpts.RootCAs = x509.NewCertPool()
  120. }
  121. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  122. if err != nil {
  123. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  124. }
  125. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  126. return nil, errors.New("connectionpool: failed parsing or CA certs")
  127. }
  128. }
  129. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  130. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  131. if err != nil {
  132. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  133. }
  134. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  135. }
  136. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  137. return &sslOpts.Config, nil
  138. }
  139. //NewSimplePool is the function used by gocql to create the simple connection pool.
  140. //This is the default if no other pool type is specified.
  141. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  142. pool := &SimplePool{
  143. cfg: cfg,
  144. hostPool: NewRoundRobin(),
  145. connPool: make(map[string]*RoundRobin),
  146. conns: make(map[*Conn]struct{}),
  147. quitWait: make(chan bool),
  148. cFillingPool: make(chan int, 1),
  149. keyspace: cfg.Keyspace,
  150. hosts: make(map[string]*HostInfo),
  151. }
  152. for _, host := range cfg.Hosts {
  153. // seed hosts have unknown topology
  154. // TODO: Handle populating this during SetHosts
  155. pool.hosts[host] = &HostInfo{Peer: host}
  156. }
  157. if cfg.SslOpts != nil {
  158. config, err := setupTLSConfig(cfg.SslOpts)
  159. if err != nil {
  160. return nil, err
  161. }
  162. pool.tlsConfig = config
  163. }
  164. //Walk through connecting to hosts. As soon as one host connects
  165. //defer the remaining connections to cluster.fillPool()
  166. for i := 0; i < len(cfg.Hosts); i++ {
  167. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  168. if pool.connect(addr) == nil {
  169. pool.cFillingPool <- 1
  170. go pool.fillPool()
  171. break
  172. }
  173. }
  174. return pool, nil
  175. }
  176. func (c *SimplePool) connect(addr string) error {
  177. cfg := ConnConfig{
  178. ProtoVersion: c.cfg.ProtoVersion,
  179. CQLVersion: c.cfg.CQLVersion,
  180. Timeout: c.cfg.Timeout,
  181. NumStreams: c.cfg.NumStreams,
  182. Compressor: c.cfg.Compressor,
  183. Authenticator: c.cfg.Authenticator,
  184. Keepalive: c.cfg.SocketKeepalive,
  185. tlsConfig: c.tlsConfig,
  186. }
  187. conn, err := Connect(addr, cfg, c)
  188. if err != nil {
  189. log.Printf("connect: failed to connect to %q: %v", addr, err)
  190. return err
  191. }
  192. return c.addConn(conn)
  193. }
  194. func (c *SimplePool) addConn(conn *Conn) error {
  195. c.mu.Lock()
  196. defer c.mu.Unlock()
  197. if c.quit {
  198. conn.Close()
  199. return nil
  200. }
  201. //Set the connection's keyspace if any before adding it to the pool
  202. if c.keyspace != "" {
  203. if err := conn.UseKeyspace(c.keyspace); err != nil {
  204. log.Printf("error setting connection keyspace. %v", err)
  205. conn.Close()
  206. return err
  207. }
  208. }
  209. connPool := c.connPool[conn.Address()]
  210. if connPool == nil {
  211. connPool = NewRoundRobin()
  212. c.connPool[conn.Address()] = connPool
  213. c.hostPool.AddNode(connPool)
  214. }
  215. connPool.AddNode(conn)
  216. c.conns[conn] = struct{}{}
  217. return nil
  218. }
  219. //fillPool manages the pool of connections making sure that each host has the correct
  220. //amount of connections defined. Also the method will test a host with one connection
  221. //instead of flooding the host with number of connections defined in the cluster config
  222. func (c *SimplePool) fillPool() {
  223. //Debounce large amounts of requests to fill pool
  224. select {
  225. case <-time.After(1 * time.Millisecond):
  226. return
  227. case <-c.cFillingPool:
  228. defer func() { c.cFillingPool <- 1 }()
  229. }
  230. c.mu.Lock()
  231. isClosed := c.quit
  232. c.mu.Unlock()
  233. //Exit if cluster(session) is closed
  234. if isClosed {
  235. return
  236. }
  237. c.hostMu.RLock()
  238. //Walk through list of defined hosts
  239. var wg sync.WaitGroup
  240. for host := range c.hosts {
  241. addr := JoinHostPort(host, c.cfg.Port)
  242. numConns := 1
  243. //See if the host already has connections in the pool
  244. c.mu.Lock()
  245. conns, ok := c.connPool[addr]
  246. c.mu.Unlock()
  247. if ok {
  248. //if the host has enough connections just exit
  249. numConns = conns.Size()
  250. if numConns >= c.cfg.NumConns {
  251. continue
  252. }
  253. } else {
  254. //See if the host is reachable
  255. if err := c.connect(addr); err != nil {
  256. continue
  257. }
  258. }
  259. //This is reached if the host is responsive and needs more connections
  260. //Create connections for host synchronously to mitigate flooding the host.
  261. wg.Add(1)
  262. go func(a string, conns int) {
  263. defer wg.Done()
  264. for ; conns < c.cfg.NumConns; conns++ {
  265. c.connect(a)
  266. }
  267. }(addr, numConns)
  268. }
  269. c.hostMu.RUnlock()
  270. //Wait until we're finished connecting to each host before returning
  271. wg.Wait()
  272. }
  273. // Should only be called if c.mu is locked
  274. func (c *SimplePool) removeConnLocked(conn *Conn) {
  275. conn.Close()
  276. connPool := c.connPool[conn.addr]
  277. if connPool == nil {
  278. return
  279. }
  280. connPool.RemoveNode(conn)
  281. if connPool.Size() == 0 {
  282. c.hostPool.RemoveNode(connPool)
  283. delete(c.connPool, conn.addr)
  284. }
  285. delete(c.conns, conn)
  286. }
  287. func (c *SimplePool) removeConn(conn *Conn) {
  288. c.mu.Lock()
  289. defer c.mu.Unlock()
  290. c.removeConnLocked(conn)
  291. }
  292. //HandleError is called by a Connection object to report to the pool an error has occured.
  293. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  294. //top off the pool.
  295. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  296. if !closed {
  297. // ignore all non-fatal errors
  298. return
  299. }
  300. c.removeConn(conn)
  301. c.mu.Lock()
  302. poolClosed := c.quit
  303. c.mu.Unlock()
  304. if !poolClosed {
  305. go c.fillPool() // top off pool.
  306. }
  307. }
  308. //Pick selects a connection to be used by the query.
  309. func (c *SimplePool) Pick(qry *Query) *Conn {
  310. //Check if connections are available
  311. c.mu.Lock()
  312. conns := len(c.conns)
  313. c.mu.Unlock()
  314. if conns == 0 {
  315. //try to populate the pool before returning.
  316. c.fillPool()
  317. }
  318. return c.hostPool.Pick(qry)
  319. }
  320. //Size returns the number of connections currently active in the pool
  321. func (p *SimplePool) Size() int {
  322. p.mu.Lock()
  323. conns := len(p.conns)
  324. p.mu.Unlock()
  325. return conns
  326. }
  327. //Close kills the pool and all associated connections.
  328. func (c *SimplePool) Close() {
  329. c.quitOnce.Do(func() {
  330. c.mu.Lock()
  331. defer c.mu.Unlock()
  332. c.quit = true
  333. close(c.quitWait)
  334. for conn := range c.conns {
  335. c.removeConnLocked(conn)
  336. }
  337. })
  338. }
  339. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  340. c.hostMu.Lock()
  341. toRemove := make(map[string]struct{})
  342. for k := range c.hosts {
  343. toRemove[k] = struct{}{}
  344. }
  345. for _, host := range hosts {
  346. host := host
  347. delete(toRemove, host.Peer)
  348. // we already have it
  349. if _, ok := c.hosts[host.Peer]; ok {
  350. // TODO: Check rack, dc, token range is consistent, trigger topology change
  351. // update stored host
  352. continue
  353. }
  354. c.hosts[host.Peer] = &host
  355. }
  356. // can we hold c.mu whilst iterating this loop?
  357. for addr := range toRemove {
  358. c.removeHostLocked(addr)
  359. }
  360. c.hostMu.Unlock()
  361. c.fillPool()
  362. }
  363. func (c *SimplePool) removeHostLocked(addr string) {
  364. if _, ok := c.hosts[addr]; !ok {
  365. return
  366. }
  367. delete(c.hosts, addr)
  368. c.mu.Lock()
  369. defer c.mu.Unlock()
  370. if _, ok := c.connPool[addr]; !ok {
  371. return
  372. }
  373. for conn := range c.conns {
  374. if conn.Address() == addr {
  375. c.removeConnLocked(conn)
  376. }
  377. }
  378. }
  379. //NewRoundRobinConnPool creates a connection pool which selects hosts by
  380. //round-robin, and then selects a connection for that host by round-robin.
  381. func NewRoundRobinConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  382. return NewPolicyConnPool(
  383. cfg,
  384. NewRoundRobinHostPolicy(),
  385. NewRoundRobinConnPolicy,
  386. )
  387. }
  388. //NewTokenAwareConnPool creates a connection pool which selects hosts by
  389. //a token aware policy, and then selects a connection for that host by
  390. //round-robin.
  391. func NewTokenAwareConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  392. return NewPolicyConnPool(
  393. cfg,
  394. NewTokenAwareHostPolicy(NewRoundRobinHostPolicy()),
  395. NewRoundRobinConnPolicy,
  396. )
  397. }
  398. type policyConnPool struct {
  399. port int
  400. numConns int
  401. connCfg ConnConfig
  402. keyspace string
  403. mu sync.RWMutex
  404. hostPolicy HostSelectionPolicy
  405. connPolicy func() ConnSelectionPolicy
  406. hostConnPools map[string]*hostConnPool
  407. }
  408. //Creates a policy based connection pool. This func isn't meant to be directly
  409. //used as a NewPoolFunc in ClusterConfig, instead a func should be created
  410. //which satisfies the NewPoolFunc type, which calls this func with the desired
  411. //hostPolicy and connPolicy; see NewRoundRobinConnPool or NewTokenAwareConnPool
  412. //for examples.
  413. func NewPolicyConnPool(
  414. cfg *ClusterConfig,
  415. hostPolicy HostSelectionPolicy,
  416. connPolicy func() ConnSelectionPolicy,
  417. ) (ConnectionPool, error) {
  418. var err error
  419. var tlsConfig *tls.Config
  420. if cfg.SslOpts != nil {
  421. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  422. if err != nil {
  423. return nil, err
  424. }
  425. }
  426. // create the pool
  427. pool := &policyConnPool{
  428. port: cfg.Port,
  429. numConns: cfg.NumConns,
  430. connCfg: ConnConfig{
  431. ProtoVersion: cfg.ProtoVersion,
  432. CQLVersion: cfg.CQLVersion,
  433. Timeout: cfg.Timeout,
  434. NumStreams: cfg.NumStreams,
  435. Compressor: cfg.Compressor,
  436. Authenticator: cfg.Authenticator,
  437. Keepalive: cfg.SocketKeepalive,
  438. tlsConfig: tlsConfig,
  439. },
  440. keyspace: cfg.Keyspace,
  441. hostPolicy: hostPolicy,
  442. connPolicy: connPolicy,
  443. hostConnPools: map[string]*hostConnPool{},
  444. }
  445. hosts := make([]HostInfo, len(cfg.Hosts))
  446. for i, hostAddr := range cfg.Hosts {
  447. hosts[i].Peer = hostAddr
  448. }
  449. pool.SetHosts(hosts)
  450. return pool, nil
  451. }
  452. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  453. p.mu.Lock()
  454. toRemove := make(map[string]struct{})
  455. for addr := range p.hostConnPools {
  456. toRemove[addr] = struct{}{}
  457. }
  458. // TODO connect to hosts in parallel, but wait for pools to be
  459. // created before returning
  460. for i := range hosts {
  461. pool, exists := p.hostConnPools[hosts[i].Peer]
  462. if !exists {
  463. // create a connection pool for the host
  464. pool = newHostConnPool(
  465. hosts[i].Peer,
  466. p.port,
  467. p.numConns,
  468. p.connCfg,
  469. p.keyspace,
  470. p.connPolicy(),
  471. )
  472. p.hostConnPools[hosts[i].Peer] = pool
  473. } else {
  474. // still have this host, so don't remove it
  475. delete(toRemove, hosts[i].Peer)
  476. }
  477. }
  478. for addr := range toRemove {
  479. pool := p.hostConnPools[addr]
  480. delete(p.hostConnPools, addr)
  481. pool.Close()
  482. }
  483. // update the policy
  484. p.hostPolicy.SetHosts(hosts)
  485. p.mu.Unlock()
  486. }
  487. func (p *policyConnPool) SetPartitioner(partitioner string) {
  488. p.hostPolicy.SetPartitioner(partitioner)
  489. }
  490. func (p *policyConnPool) Size() int {
  491. p.mu.RLock()
  492. count := 0
  493. for _, pool := range p.hostConnPools {
  494. count += pool.Size()
  495. }
  496. p.mu.RUnlock()
  497. return count
  498. }
  499. func (p *policyConnPool) Pick(qry *Query) *Conn {
  500. nextHost := p.hostPolicy.Pick(qry)
  501. p.mu.RLock()
  502. var host *HostInfo
  503. var conn *Conn
  504. for conn == nil {
  505. host = nextHost()
  506. if host == nil {
  507. break
  508. }
  509. conn = p.hostConnPools[host.Peer].Pick(qry)
  510. }
  511. p.mu.RUnlock()
  512. return conn
  513. }
  514. func (p *policyConnPool) Close() {
  515. p.mu.Lock()
  516. // remove the hosts from the policy
  517. p.hostPolicy.SetHosts([]HostInfo{})
  518. // close the pools
  519. for addr, pool := range p.hostConnPools {
  520. delete(p.hostConnPools, addr)
  521. pool.Close()
  522. }
  523. p.mu.Unlock()
  524. }
  525. // hostConnPool is a connection pool for a single host.
  526. // Connection selection is based on a provided ConnSelectionPolicy
  527. type hostConnPool struct {
  528. host string
  529. port int
  530. addr string
  531. size int
  532. connCfg ConnConfig
  533. keyspace string
  534. policy ConnSelectionPolicy
  535. // protection for conns, closed, filling
  536. mu sync.RWMutex
  537. conns []*Conn
  538. closed bool
  539. filling bool
  540. }
  541. func newHostConnPool(
  542. host string,
  543. port int,
  544. size int,
  545. connCfg ConnConfig,
  546. keyspace string,
  547. policy ConnSelectionPolicy,
  548. ) *hostConnPool {
  549. pool := &hostConnPool{
  550. host: host,
  551. port: port,
  552. addr: JoinHostPort(host, port),
  553. size: size,
  554. connCfg: connCfg,
  555. keyspace: keyspace,
  556. policy: policy,
  557. conns: make([]*Conn, 0, size),
  558. filling: false,
  559. closed: false,
  560. }
  561. // fill the pool with the initial connections before returning
  562. pool.fill()
  563. return pool
  564. }
  565. // Pick a connection from this connection pool for the given query.
  566. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  567. pool.mu.RLock()
  568. if pool.closed {
  569. pool.mu.RUnlock()
  570. return nil
  571. }
  572. empty := len(pool.conns) == 0
  573. pool.mu.RUnlock()
  574. if empty {
  575. // try to fill the empty pool
  576. go pool.fill()
  577. return nil
  578. }
  579. return pool.policy.Pick(qry)
  580. }
  581. //Size returns the number of connections currently active in the pool
  582. func (pool *hostConnPool) Size() int {
  583. pool.mu.RLock()
  584. defer pool.mu.RUnlock()
  585. return len(pool.conns)
  586. }
  587. //Close the connection pool
  588. func (pool *hostConnPool) Close() {
  589. pool.mu.Lock()
  590. defer pool.mu.Unlock()
  591. if pool.closed {
  592. return
  593. }
  594. pool.closed = true
  595. // drain, but don't wait
  596. go pool.drain()
  597. }
  598. // Fill the connection pool
  599. func (pool *hostConnPool) fill() {
  600. pool.mu.RLock()
  601. // avoid filling a closed pool, or concurrent filling
  602. if pool.closed || pool.filling {
  603. pool.mu.RUnlock()
  604. return
  605. }
  606. // determine the filling work to be done
  607. startCount := len(pool.conns)
  608. fillCount := pool.size - startCount
  609. // avoid filling a full (or overfull) pool
  610. if fillCount <= 0 {
  611. pool.mu.RUnlock()
  612. return
  613. }
  614. // switch from read to write lock
  615. pool.mu.RUnlock()
  616. pool.mu.Lock()
  617. // double check everything since the lock was released
  618. startCount = len(pool.conns)
  619. fillCount = pool.size - startCount
  620. if pool.closed || pool.filling || fillCount <= 0 {
  621. // looks like another goroutine already beat this
  622. // goroutine to the filling
  623. pool.mu.Unlock()
  624. return
  625. }
  626. // ok fill the pool
  627. pool.filling = true
  628. // allow others to access the pool while filling
  629. pool.mu.Unlock()
  630. // only this goroutine should make calls to fill/empty the pool at this
  631. // point until after this routine or its subordinates calls
  632. // fillingStopped
  633. // fill only the first connection synchronously
  634. if startCount == 0 {
  635. err := pool.connect()
  636. pool.logConnectErr(err)
  637. if err != nil {
  638. // probably unreachable host
  639. go pool.fillingStopped()
  640. return
  641. }
  642. // filled one
  643. fillCount--
  644. // connect all connections to this host in sync
  645. for fillCount > 0 {
  646. err := pool.connect()
  647. pool.logConnectErr(err)
  648. // decrement, even on error
  649. fillCount--
  650. }
  651. go pool.fillingStopped()
  652. return
  653. }
  654. // fill the rest of the pool asynchronously
  655. go func() {
  656. for fillCount > 0 {
  657. err := pool.connect()
  658. pool.logConnectErr(err)
  659. // decrement, even on error
  660. fillCount--
  661. }
  662. // mark the end of filling
  663. pool.fillingStopped()
  664. }()
  665. }
  666. func (pool *hostConnPool) logConnectErr(err error) {
  667. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  668. // connection refused
  669. // these are typical during a node outage so avoid log spam.
  670. } else if err != nil {
  671. // unexpected error
  672. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  673. }
  674. }
  675. // transition back to a not-filling state.
  676. func (pool *hostConnPool) fillingStopped() {
  677. // wait for some time to avoid back-to-back filling
  678. // this provides some time between failed attempts
  679. // to fill the pool for the host to recover
  680. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  681. pool.mu.Lock()
  682. pool.filling = false
  683. pool.mu.Unlock()
  684. }
  685. // create a new connection to the host and add it to the pool
  686. func (pool *hostConnPool) connect() error {
  687. // try to connect
  688. conn, err := Connect(pool.addr, pool.connCfg, pool)
  689. if err != nil {
  690. return err
  691. }
  692. if pool.keyspace != "" {
  693. // set the keyspace
  694. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  695. conn.Close()
  696. return err
  697. }
  698. }
  699. // add the Conn to the pool
  700. pool.mu.Lock()
  701. defer pool.mu.Unlock()
  702. if pool.closed {
  703. conn.Close()
  704. return nil
  705. }
  706. pool.conns = append(pool.conns, conn)
  707. pool.policy.SetConns(pool.conns)
  708. return nil
  709. }
  710. // handle any error from a Conn
  711. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  712. if !closed {
  713. // still an open connection, so continue using it
  714. return
  715. }
  716. pool.mu.Lock()
  717. defer pool.mu.Unlock()
  718. if pool.closed {
  719. // pool closed
  720. return
  721. }
  722. // find the connection index
  723. for i, candidate := range pool.conns {
  724. if candidate == conn {
  725. // remove the connection, not preserving order
  726. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  727. // update the policy
  728. pool.policy.SetConns(pool.conns)
  729. // lost a connection, so fill the pool
  730. go pool.fill()
  731. break
  732. }
  733. }
  734. }
  735. // removes and closes all connections from the pool
  736. func (pool *hostConnPool) drain() {
  737. pool.mu.Lock()
  738. defer pool.mu.Unlock()
  739. // empty the pool
  740. conns := pool.conns
  741. pool.conns = pool.conns[:0]
  742. // update the policy
  743. pool.policy.SetConns(pool.conns)
  744. // close the connections
  745. for _, conn := range conns {
  746. conn.Close()
  747. }
  748. }