connectionpool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  18. Purpose
  19. The connection pool in gocql opens and closes connections as well as selects an available connection
  20. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  21. are caught by the connection experiencing the error.
  22. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  23. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  24. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  25. Example of Single Connection Pool:
  26. type SingleConnection struct {
  27. conn *Conn
  28. cfg *ClusterConfig
  29. }
  30. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  31. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  32. connCfg := ConnConfig{
  33. ProtoVersion: cfg.ProtoVersion,
  34. CQLVersion: cfg.CQLVersion,
  35. Timeout: cfg.Timeout,
  36. NumStreams: cfg.NumStreams,
  37. Compressor: cfg.Compressor,
  38. Authenticator: cfg.Authenticator,
  39. Keepalive: cfg.SocketKeepalive,
  40. }
  41. pool := SingleConnection{cfg:cfg}
  42. pool.conn = Connect(addr,connCfg,pool)
  43. return &pool
  44. }
  45. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  46. if closed {
  47. connCfg := ConnConfig{
  48. ProtoVersion: cfg.ProtoVersion,
  49. CQLVersion: cfg.CQLVersion,
  50. Timeout: cfg.Timeout,
  51. NumStreams: cfg.NumStreams,
  52. Compressor: cfg.Compressor,
  53. Authenticator: cfg.Authenticator,
  54. Keepalive: cfg.SocketKeepalive,
  55. }
  56. s.conn = Connect(conn.Address(),connCfg,s)
  57. }
  58. }
  59. func (s *SingleConnection) Pick(qry *Query) *Conn {
  60. if s.conn.isClosed {
  61. return nil
  62. }
  63. return s.conn
  64. }
  65. func (s *SingleConnection) Size() int {
  66. return 1
  67. }
  68. func (s *SingleConnection) Close() {
  69. s.conn.Close()
  70. }
  71. This is a very simple example of a type that exposes the connection pool interface. To assign
  72. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  73. cluster := NewCluster("127.0.0.1")
  74. cluster.ConnPoolType = NewSingleConnection
  75. ...
  76. session, err := cluster.CreateSession()
  77. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  78. */
  79. type ConnectionPool interface {
  80. SetHosts
  81. Pick(*Query) *Conn
  82. Size() int
  83. Close()
  84. }
  85. // interface to implement to receive the host information
  86. type SetHosts interface {
  87. SetHosts(hosts []HostInfo)
  88. }
  89. // interface to implement to receive the partitioner value
  90. type SetPartitioner interface {
  91. SetPartitioner(partitioner string)
  92. }
  93. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  94. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  95. //SimplePool is the current implementation of the connection pool inside gocql. This
  96. //pool is meant to be a simple default used by gocql so users can get up and running
  97. //quickly.
  98. type SimplePool struct {
  99. cfg *ClusterConfig
  100. hostPool *RoundRobin
  101. connPool map[string]*RoundRobin
  102. conns map[*Conn]struct{}
  103. keyspace string
  104. hostMu sync.RWMutex
  105. // this is the set of current hosts which the pool will attempt to connect to
  106. hosts map[string]*HostInfo
  107. // protects hostpool, connPoll, conns, quit
  108. mu sync.Mutex
  109. cFillingPool chan int
  110. quit bool
  111. quitWait chan bool
  112. quitOnce sync.Once
  113. tlsConfig *tls.Config
  114. }
  115. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  116. certPool := x509.NewCertPool()
  117. // ca cert is optional
  118. if sslOpts.CaPath != "" {
  119. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  120. if err != nil {
  121. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  122. }
  123. if !certPool.AppendCertsFromPEM(pem) {
  124. return nil, errors.New("connectionpool: failed parsing or CA certs")
  125. }
  126. }
  127. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  128. if err != nil {
  129. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  130. }
  131. config := &tls.Config{
  132. Certificates: []tls.Certificate{mycert},
  133. RootCAs: certPool,
  134. }
  135. config.InsecureSkipVerify = !sslOpts.EnableHostVerification
  136. return config, nil
  137. }
  138. //NewSimplePool is the function used by gocql to create the simple connection pool.
  139. //This is the default if no other pool type is specified.
  140. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  141. pool := &SimplePool{
  142. cfg: cfg,
  143. hostPool: NewRoundRobin(),
  144. connPool: make(map[string]*RoundRobin),
  145. conns: make(map[*Conn]struct{}),
  146. quitWait: make(chan bool),
  147. cFillingPool: make(chan int, 1),
  148. keyspace: cfg.Keyspace,
  149. hosts: make(map[string]*HostInfo),
  150. }
  151. for _, host := range cfg.Hosts {
  152. // seed hosts have unknown topology
  153. // TODO: Handle populating this during SetHosts
  154. pool.hosts[host] = &HostInfo{Peer: host}
  155. }
  156. if cfg.SslOpts != nil {
  157. config, err := setupTLSConfig(cfg.SslOpts)
  158. if err != nil {
  159. return nil, err
  160. }
  161. pool.tlsConfig = config
  162. }
  163. //Walk through connecting to hosts. As soon as one host connects
  164. //defer the remaining connections to cluster.fillPool()
  165. for i := 0; i < len(cfg.Hosts); i++ {
  166. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  167. if pool.connect(addr) == nil {
  168. pool.cFillingPool <- 1
  169. go pool.fillPool()
  170. break
  171. }
  172. }
  173. return pool, nil
  174. }
  175. func (c *SimplePool) connect(addr string) error {
  176. cfg := ConnConfig{
  177. ProtoVersion: c.cfg.ProtoVersion,
  178. CQLVersion: c.cfg.CQLVersion,
  179. Timeout: c.cfg.Timeout,
  180. NumStreams: c.cfg.NumStreams,
  181. Compressor: c.cfg.Compressor,
  182. Authenticator: c.cfg.Authenticator,
  183. Keepalive: c.cfg.SocketKeepalive,
  184. tlsConfig: c.tlsConfig,
  185. }
  186. conn, err := Connect(addr, cfg, c)
  187. if err != nil {
  188. log.Printf("connect: failed to connect to %q: %v", addr, err)
  189. return err
  190. }
  191. return c.addConn(conn)
  192. }
  193. func (c *SimplePool) addConn(conn *Conn) error {
  194. c.mu.Lock()
  195. defer c.mu.Unlock()
  196. if c.quit {
  197. conn.Close()
  198. return nil
  199. }
  200. //Set the connection's keyspace if any before adding it to the pool
  201. if c.keyspace != "" {
  202. if err := conn.UseKeyspace(c.keyspace); err != nil {
  203. log.Printf("error setting connection keyspace. %v", err)
  204. conn.Close()
  205. return err
  206. }
  207. }
  208. connPool := c.connPool[conn.Address()]
  209. if connPool == nil {
  210. connPool = NewRoundRobin()
  211. c.connPool[conn.Address()] = connPool
  212. c.hostPool.AddNode(connPool)
  213. }
  214. connPool.AddNode(conn)
  215. c.conns[conn] = struct{}{}
  216. return nil
  217. }
  218. //fillPool manages the pool of connections making sure that each host has the correct
  219. //amount of connections defined. Also the method will test a host with one connection
  220. //instead of flooding the host with number of connections defined in the cluster config
  221. func (c *SimplePool) fillPool() {
  222. //Debounce large amounts of requests to fill pool
  223. select {
  224. case <-time.After(1 * time.Millisecond):
  225. return
  226. case <-c.cFillingPool:
  227. defer func() { c.cFillingPool <- 1 }()
  228. }
  229. c.mu.Lock()
  230. isClosed := c.quit
  231. c.mu.Unlock()
  232. //Exit if cluster(session) is closed
  233. if isClosed {
  234. return
  235. }
  236. c.hostMu.RLock()
  237. //Walk through list of defined hosts
  238. var wg sync.WaitGroup
  239. for host := range c.hosts {
  240. addr := JoinHostPort(host, c.cfg.Port)
  241. numConns := 1
  242. //See if the host already has connections in the pool
  243. c.mu.Lock()
  244. conns, ok := c.connPool[addr]
  245. c.mu.Unlock()
  246. if ok {
  247. //if the host has enough connections just exit
  248. numConns = conns.Size()
  249. if numConns >= c.cfg.NumConns {
  250. continue
  251. }
  252. } else {
  253. //See if the host is reachable
  254. if err := c.connect(addr); err != nil {
  255. continue
  256. }
  257. }
  258. //This is reached if the host is responsive and needs more connections
  259. //Create connections for host synchronously to mitigate flooding the host.
  260. wg.Add(1)
  261. go func(a string, conns int) {
  262. defer wg.Done()
  263. for ; conns < c.cfg.NumConns; conns++ {
  264. c.connect(a)
  265. }
  266. }(addr, numConns)
  267. }
  268. c.hostMu.RUnlock()
  269. //Wait until we're finished connecting to each host before returning
  270. wg.Wait()
  271. }
  272. // Should only be called if c.mu is locked
  273. func (c *SimplePool) removeConnLocked(conn *Conn) {
  274. conn.Close()
  275. connPool := c.connPool[conn.addr]
  276. if connPool == nil {
  277. return
  278. }
  279. connPool.RemoveNode(conn)
  280. if connPool.Size() == 0 {
  281. c.hostPool.RemoveNode(connPool)
  282. delete(c.connPool, conn.addr)
  283. }
  284. delete(c.conns, conn)
  285. }
  286. func (c *SimplePool) removeConn(conn *Conn) {
  287. c.mu.Lock()
  288. defer c.mu.Unlock()
  289. c.removeConnLocked(conn)
  290. }
  291. //HandleError is called by a Connection object to report to the pool an error has occured.
  292. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  293. //top off the pool.
  294. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  295. if !closed {
  296. // ignore all non-fatal errors
  297. return
  298. }
  299. c.removeConn(conn)
  300. c.mu.Lock()
  301. poolClosed := c.quit
  302. c.mu.Unlock()
  303. if !poolClosed {
  304. go c.fillPool() // top off pool.
  305. }
  306. }
  307. //Pick selects a connection to be used by the query.
  308. func (c *SimplePool) Pick(qry *Query) *Conn {
  309. //Check if connections are available
  310. c.mu.Lock()
  311. conns := len(c.conns)
  312. c.mu.Unlock()
  313. if conns == 0 {
  314. //try to populate the pool before returning.
  315. c.fillPool()
  316. }
  317. return c.hostPool.Pick(qry)
  318. }
  319. //Size returns the number of connections currently active in the pool
  320. func (p *SimplePool) Size() int {
  321. p.mu.Lock()
  322. conns := len(p.conns)
  323. p.mu.Unlock()
  324. return conns
  325. }
  326. //Close kills the pool and all associated connections.
  327. func (c *SimplePool) Close() {
  328. c.quitOnce.Do(func() {
  329. c.mu.Lock()
  330. defer c.mu.Unlock()
  331. c.quit = true
  332. close(c.quitWait)
  333. for conn := range c.conns {
  334. c.removeConnLocked(conn)
  335. }
  336. })
  337. }
  338. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  339. c.hostMu.Lock()
  340. toRemove := make(map[string]struct{})
  341. for k := range c.hosts {
  342. toRemove[k] = struct{}{}
  343. }
  344. for _, host := range hosts {
  345. host := host
  346. delete(toRemove, host.Peer)
  347. // we already have it
  348. if _, ok := c.hosts[host.Peer]; ok {
  349. // TODO: Check rack, dc, token range is consistent, trigger topology change
  350. // update stored host
  351. continue
  352. }
  353. c.hosts[host.Peer] = &host
  354. }
  355. // can we hold c.mu whilst iterating this loop?
  356. for addr := range toRemove {
  357. c.removeHostLocked(addr)
  358. }
  359. c.hostMu.Unlock()
  360. c.fillPool()
  361. }
  362. func (c *SimplePool) removeHostLocked(addr string) {
  363. if _, ok := c.hosts[addr]; !ok {
  364. return
  365. }
  366. delete(c.hosts, addr)
  367. c.mu.Lock()
  368. defer c.mu.Unlock()
  369. if _, ok := c.connPool[addr]; !ok {
  370. return
  371. }
  372. for conn := range c.conns {
  373. if conn.Address() == addr {
  374. c.removeConnLocked(conn)
  375. }
  376. }
  377. }
  378. //NewRoundRobinConnPool creates a connection pool which selects hosts by
  379. //round-robin, and then selects a connection for that host by round-robin.
  380. func NewRoundRobinConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  381. return NewPolicyConnPool(
  382. cfg,
  383. NewRoundRobinHostPolicy(),
  384. NewRoundRobinConnPolicy,
  385. )
  386. }
  387. //NewTokenAwareConnPool creates a connection pool which selects hosts by
  388. //a token aware policy, and then selects a connection for that host by
  389. //round-robin.
  390. func NewTokenAwareConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  391. return NewPolicyConnPool(
  392. cfg,
  393. NewTokenAwareHostPolicy(NewRoundRobinHostPolicy()),
  394. NewRoundRobinConnPolicy,
  395. )
  396. }
  397. type policyConnPool struct {
  398. port int
  399. numConns int
  400. connCfg ConnConfig
  401. keyspace string
  402. mu sync.RWMutex
  403. hostPolicy HostSelectionPolicy
  404. connPolicy func() ConnSelectionPolicy
  405. hostConnPools map[string]*hostConnPool
  406. }
  407. //Creates a policy based connection pool. This func isn't meant to be directly
  408. //used as a NewPoolFunc in ClusterConfig, instead a func should be created
  409. //which satisfies the NewPoolFunc type, which calls this func with the desired
  410. //hostPolicy and connPolicy; see NewRoundRobinConnPool or NewTokenAwareConnPool
  411. //for examples.
  412. func NewPolicyConnPool(
  413. cfg *ClusterConfig,
  414. hostPolicy HostSelectionPolicy,
  415. connPolicy func() ConnSelectionPolicy,
  416. ) (ConnectionPool, error) {
  417. var err error
  418. var tlsConfig *tls.Config
  419. if cfg.SslOpts != nil {
  420. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  421. if err != nil {
  422. return nil, err
  423. }
  424. }
  425. // create the pool
  426. pool := &policyConnPool{
  427. port: cfg.Port,
  428. numConns: cfg.NumConns,
  429. connCfg: ConnConfig{
  430. ProtoVersion: cfg.ProtoVersion,
  431. CQLVersion: cfg.CQLVersion,
  432. Timeout: cfg.Timeout,
  433. NumStreams: cfg.NumStreams,
  434. Compressor: cfg.Compressor,
  435. Authenticator: cfg.Authenticator,
  436. Keepalive: cfg.SocketKeepalive,
  437. tlsConfig: tlsConfig,
  438. },
  439. keyspace: cfg.Keyspace,
  440. hostPolicy: hostPolicy,
  441. connPolicy: connPolicy,
  442. hostConnPools: map[string]*hostConnPool{},
  443. }
  444. hosts := make([]HostInfo, len(cfg.Hosts))
  445. for i, hostAddr := range cfg.Hosts {
  446. hosts[i].Peer = hostAddr
  447. }
  448. pool.SetHosts(hosts)
  449. return pool, nil
  450. }
  451. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  452. p.mu.Lock()
  453. toRemove := make(map[string]struct{})
  454. for addr := range p.hostConnPools {
  455. toRemove[addr] = struct{}{}
  456. }
  457. // TODO connect to hosts in parallel, but wait for pools to be
  458. // created before returning
  459. for i := range hosts {
  460. pool, exists := p.hostConnPools[hosts[i].Peer]
  461. if !exists {
  462. // create a connection pool for the host
  463. pool = newHostConnPool(
  464. hosts[i].Peer,
  465. p.port,
  466. p.numConns,
  467. p.connCfg,
  468. p.keyspace,
  469. p.connPolicy(),
  470. )
  471. p.hostConnPools[hosts[i].Peer] = pool
  472. } else {
  473. // still have this host, so don't remove it
  474. delete(toRemove, hosts[i].Peer)
  475. }
  476. }
  477. for addr := range toRemove {
  478. pool := p.hostConnPools[addr]
  479. delete(p.hostConnPools, addr)
  480. pool.Close()
  481. }
  482. // update the policy
  483. p.hostPolicy.SetHosts(hosts)
  484. p.mu.Unlock()
  485. }
  486. func (p *policyConnPool) SetPartitioner(partitioner string) {
  487. p.hostPolicy.SetPartitioner(partitioner)
  488. }
  489. func (p *policyConnPool) Size() int {
  490. p.mu.RLock()
  491. count := 0
  492. for _, pool := range p.hostConnPools {
  493. count += pool.Size()
  494. }
  495. p.mu.RUnlock()
  496. return count
  497. }
  498. func (p *policyConnPool) Pick(qry *Query) *Conn {
  499. nextHost := p.hostPolicy.Pick(qry)
  500. p.mu.RLock()
  501. var host *HostInfo
  502. var conn *Conn
  503. for conn == nil {
  504. host = nextHost()
  505. if host == nil {
  506. break
  507. }
  508. conn = p.hostConnPools[host.Peer].Pick(qry)
  509. }
  510. p.mu.RUnlock()
  511. return conn
  512. }
  513. func (p *policyConnPool) Close() {
  514. p.mu.Lock()
  515. // remove the hosts from the policy
  516. p.hostPolicy.SetHosts([]HostInfo{})
  517. // close the pools
  518. for addr, pool := range p.hostConnPools {
  519. delete(p.hostConnPools, addr)
  520. pool.Close()
  521. }
  522. p.mu.Unlock()
  523. }
  524. // hostConnPool is a connection pool for a single host.
  525. // Connection selection is based on a provided ConnSelectionPolicy
  526. type hostConnPool struct {
  527. host string
  528. port int
  529. addr string
  530. size int
  531. connCfg ConnConfig
  532. keyspace string
  533. policy ConnSelectionPolicy
  534. // protection for conns, closed, filling
  535. mu sync.RWMutex
  536. conns []*Conn
  537. closed bool
  538. filling bool
  539. }
  540. func newHostConnPool(
  541. host string,
  542. port int,
  543. size int,
  544. connCfg ConnConfig,
  545. keyspace string,
  546. policy ConnSelectionPolicy,
  547. ) *hostConnPool {
  548. pool := &hostConnPool{
  549. host: host,
  550. port: port,
  551. addr: JoinHostPort(host, port),
  552. size: size,
  553. connCfg: connCfg,
  554. keyspace: keyspace,
  555. policy: policy,
  556. conns: make([]*Conn, 0, size),
  557. filling: false,
  558. closed: false,
  559. }
  560. // fill the pool with the initial connections before returning
  561. pool.fill()
  562. return pool
  563. }
  564. // Pick a connection from this connection pool for the given query.
  565. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  566. pool.mu.RLock()
  567. if pool.closed {
  568. pool.mu.RUnlock()
  569. return nil
  570. }
  571. empty := len(pool.conns) == 0
  572. pool.mu.RUnlock()
  573. if empty {
  574. // try to fill the empty pool
  575. pool.fill()
  576. }
  577. return pool.policy.Pick(qry)
  578. }
  579. //Size returns the number of connections currently active in the pool
  580. func (pool *hostConnPool) Size() int {
  581. pool.mu.RLock()
  582. defer pool.mu.RUnlock()
  583. return len(pool.conns)
  584. }
  585. //Close the connection pool
  586. func (pool *hostConnPool) Close() {
  587. pool.mu.Lock()
  588. defer pool.mu.Unlock()
  589. if pool.closed {
  590. return
  591. }
  592. pool.closed = true
  593. // drain, but don't wait
  594. go pool.drain()
  595. }
  596. // Fill the connection pool
  597. func (pool *hostConnPool) fill() {
  598. pool.mu.RLock()
  599. // avoid filling a closed pool, or concurrent filling
  600. if pool.closed || pool.filling {
  601. pool.mu.RUnlock()
  602. return
  603. }
  604. // determine the filling work to be done
  605. startCount := len(pool.conns)
  606. fillCount := pool.size - startCount
  607. // avoid filling a full (or overfull) pool
  608. if fillCount <= 0 {
  609. pool.mu.RUnlock()
  610. return
  611. }
  612. // switch from read to write lock
  613. pool.mu.RUnlock()
  614. pool.mu.Lock()
  615. // double check everything since the lock was released
  616. startCount = len(pool.conns)
  617. fillCount = pool.size - startCount
  618. if pool.closed || pool.filling || fillCount <= 0 {
  619. // looks like another goroutine already beat this
  620. // goroutine to the filling
  621. pool.mu.Unlock()
  622. return
  623. }
  624. // ok fill the pool
  625. pool.filling = true
  626. // allow others to access the pool while filling
  627. pool.mu.Unlock()
  628. // only this goroutine should make calls to fill/empty the pool at this
  629. // point until after this routine or its subordinates calls
  630. // fillingStopped
  631. // fill only the first connection synchronously
  632. if startCount == 0 {
  633. err := pool.connect()
  634. pool.logConnectErr(err)
  635. if err != nil {
  636. // probably unreachable host
  637. go pool.fillingStopped()
  638. return
  639. }
  640. // filled one
  641. fillCount--
  642. }
  643. // fill the rest of the pool asynchronously
  644. go func() {
  645. for fillCount > 0 {
  646. err := pool.connect()
  647. pool.logConnectErr(err)
  648. // decrement, even on error
  649. fillCount--
  650. }
  651. // mark the end of filling
  652. pool.fillingStopped()
  653. }()
  654. }
  655. func (pool *hostConnPool) logConnectErr(err error) {
  656. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  657. // connection refused
  658. // these are typical during a node outage so avoid log spam.
  659. } else if err != nil {
  660. // unexpected error
  661. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  662. }
  663. }
  664. // transition back to a not-filling state.
  665. func (pool *hostConnPool) fillingStopped() {
  666. // wait for some time to avoid back-to-back filling
  667. // this provides some time between failed attempts
  668. // to fill the pool for the host to recover
  669. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  670. pool.mu.Lock()
  671. pool.filling = false
  672. pool.mu.Unlock()
  673. }
  674. // create a new connection to the host and add it to the pool
  675. func (pool *hostConnPool) connect() error {
  676. // try to connect
  677. conn, err := Connect(pool.addr, pool.connCfg, pool)
  678. if err != nil {
  679. return err
  680. }
  681. if pool.keyspace != "" {
  682. // set the keyspace
  683. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  684. conn.Close()
  685. return err
  686. }
  687. }
  688. // add the Conn to the pool
  689. pool.mu.Lock()
  690. defer pool.mu.Unlock()
  691. if pool.closed {
  692. conn.Close()
  693. return nil
  694. }
  695. pool.conns = append(pool.conns, conn)
  696. pool.policy.SetConns(pool.conns)
  697. return nil
  698. }
  699. // handle any error from a Conn
  700. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  701. if !closed {
  702. // still an open connection, so continue using it
  703. return
  704. }
  705. pool.mu.Lock()
  706. defer pool.mu.Unlock()
  707. if pool.closed {
  708. // pool closed
  709. return
  710. }
  711. // find the connection index
  712. for i, candidate := range pool.conns {
  713. if candidate == conn {
  714. // remove the connection, not preserving order
  715. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  716. // update the policy
  717. pool.policy.SetConns(pool.conns)
  718. // lost a connection, so fill the pool
  719. go pool.fill()
  720. break
  721. }
  722. }
  723. }
  724. // removes and closes all connections from the pool
  725. func (pool *hostConnPool) drain() {
  726. pool.mu.Lock()
  727. defer pool.mu.Unlock()
  728. // empty the pool
  729. conns := pool.conns
  730. pool.conns = pool.conns[:0]
  731. // update the policy
  732. pool.policy.SetConns(pool.conns)
  733. // close the connections
  734. for _, conn := range conns {
  735. conn.Close()
  736. }
  737. }