connectionpool.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. package gocql
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net"
  10. "sync"
  11. "time"
  12. )
  13. /*ConnectionPool represents the interface gocql will use to work with a collection of connections.
  14. Purpose
  15. The connection pool in gocql opens and closes connections as well as selects an available connection
  16. for gocql to execute a query against. The pool is also respnsible for handling connection errors that
  17. are caught by the connection experiencing the error.
  18. A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
  19. upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
  20. lead to issues with variables being modified outside the expectations of the ConnectionPool type.
  21. Example of Single Connection Pool:
  22. type SingleConnection struct {
  23. conn *Conn
  24. cfg *ClusterConfig
  25. }
  26. func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
  27. addr := JoinHostPort(cfg.Hosts[0], cfg.Port)
  28. connCfg := ConnConfig{
  29. ProtoVersion: cfg.ProtoVersion,
  30. CQLVersion: cfg.CQLVersion,
  31. Timeout: cfg.Timeout,
  32. NumStreams: cfg.NumStreams,
  33. Compressor: cfg.Compressor,
  34. Authenticator: cfg.Authenticator,
  35. Keepalive: cfg.SocketKeepalive,
  36. }
  37. pool := SingleConnection{cfg:cfg}
  38. pool.conn = Connect(addr,connCfg,pool)
  39. return &pool
  40. }
  41. func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
  42. if closed {
  43. connCfg := ConnConfig{
  44. ProtoVersion: cfg.ProtoVersion,
  45. CQLVersion: cfg.CQLVersion,
  46. Timeout: cfg.Timeout,
  47. NumStreams: cfg.NumStreams,
  48. Compressor: cfg.Compressor,
  49. Authenticator: cfg.Authenticator,
  50. Keepalive: cfg.SocketKeepalive,
  51. }
  52. s.conn = Connect(conn.Address(),connCfg,s)
  53. }
  54. }
  55. func (s *SingleConnection) Pick(qry *Query) *Conn {
  56. if s.conn.isClosed {
  57. return nil
  58. }
  59. return s.conn
  60. }
  61. func (s *SingleConnection) Size() int {
  62. return 1
  63. }
  64. func (s *SingleConnection) Close() {
  65. s.conn.Close()
  66. }
  67. This is a very simple example of a type that exposes the connection pool interface. To assign
  68. this type as the connection pool to use you would assign it to the ClusterConfig like so:
  69. cluster := NewCluster("127.0.0.1")
  70. cluster.ConnPoolType = NewSingleConnection
  71. ...
  72. session, err := cluster.CreateSession()
  73. To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
  74. */
  75. type ConnectionPool interface {
  76. Pick(*Query) *Conn
  77. Size() int
  78. Close()
  79. SetHosts(hosts []HostInfo)
  80. }
  81. // interface to implement to receive the partitioner value
  82. type SetPartitioner interface {
  83. SetPartitioner(partitioner string)
  84. }
  85. //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
  86. type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
  87. //SimplePool is the current implementation of the connection pool inside gocql. This
  88. //pool is meant to be a simple default used by gocql so users can get up and running
  89. //quickly.
  90. type SimplePool struct {
  91. cfg *ClusterConfig
  92. hostPool *RoundRobin
  93. connPool map[string]*RoundRobin
  94. conns map[*Conn]struct{}
  95. keyspace string
  96. hostMu sync.RWMutex
  97. // this is the set of current hosts which the pool will attempt to connect to
  98. hosts map[string]*HostInfo
  99. // protects hostpool, connPoll, conns, quit
  100. mu sync.Mutex
  101. cFillingPool chan int
  102. quit bool
  103. quitWait chan bool
  104. quitOnce sync.Once
  105. tlsConfig *tls.Config
  106. }
  107. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  108. certPool := x509.NewCertPool()
  109. // ca cert is optional
  110. if sslOpts.CaPath != "" {
  111. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  112. if err != nil {
  113. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  114. }
  115. if !certPool.AppendCertsFromPEM(pem) {
  116. return nil, errors.New("connectionpool: failed parsing or CA certs")
  117. }
  118. }
  119. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  120. if err != nil {
  121. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  122. }
  123. config := &tls.Config{
  124. Certificates: []tls.Certificate{mycert},
  125. RootCAs: certPool,
  126. }
  127. config.InsecureSkipVerify = !sslOpts.EnableHostVerification
  128. return config, nil
  129. }
  130. //NewSimplePool is the function used by gocql to create the simple connection pool.
  131. //This is the default if no other pool type is specified.
  132. func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
  133. pool := &SimplePool{
  134. cfg: cfg,
  135. hostPool: NewRoundRobin(),
  136. connPool: make(map[string]*RoundRobin),
  137. conns: make(map[*Conn]struct{}),
  138. quitWait: make(chan bool),
  139. cFillingPool: make(chan int, 1),
  140. keyspace: cfg.Keyspace,
  141. hosts: make(map[string]*HostInfo),
  142. }
  143. for _, host := range cfg.Hosts {
  144. // seed hosts have unknown topology
  145. // TODO: Handle populating this during SetHosts
  146. pool.hosts[host] = &HostInfo{Peer: host}
  147. }
  148. if cfg.SslOpts != nil {
  149. config, err := setupTLSConfig(cfg.SslOpts)
  150. if err != nil {
  151. return nil, err
  152. }
  153. pool.tlsConfig = config
  154. }
  155. //Walk through connecting to hosts. As soon as one host connects
  156. //defer the remaining connections to cluster.fillPool()
  157. for i := 0; i < len(cfg.Hosts); i++ {
  158. addr := JoinHostPort(cfg.Hosts[i], cfg.Port)
  159. if pool.connect(addr) == nil {
  160. pool.cFillingPool <- 1
  161. go pool.fillPool()
  162. break
  163. }
  164. }
  165. return pool, nil
  166. }
  167. func (c *SimplePool) connect(addr string) error {
  168. cfg := ConnConfig{
  169. ProtoVersion: c.cfg.ProtoVersion,
  170. CQLVersion: c.cfg.CQLVersion,
  171. Timeout: c.cfg.Timeout,
  172. NumStreams: c.cfg.NumStreams,
  173. Compressor: c.cfg.Compressor,
  174. Authenticator: c.cfg.Authenticator,
  175. Keepalive: c.cfg.SocketKeepalive,
  176. tlsConfig: c.tlsConfig,
  177. }
  178. conn, err := Connect(addr, cfg, c)
  179. if err != nil {
  180. log.Printf("connect: failed to connect to %q: %v", addr, err)
  181. return err
  182. }
  183. return c.addConn(conn)
  184. }
  185. func (c *SimplePool) addConn(conn *Conn) error {
  186. c.mu.Lock()
  187. defer c.mu.Unlock()
  188. if c.quit {
  189. conn.Close()
  190. return nil
  191. }
  192. //Set the connection's keyspace if any before adding it to the pool
  193. if c.keyspace != "" {
  194. if err := conn.UseKeyspace(c.keyspace); err != nil {
  195. log.Printf("error setting connection keyspace. %v", err)
  196. conn.Close()
  197. return err
  198. }
  199. }
  200. connPool := c.connPool[conn.Address()]
  201. if connPool == nil {
  202. connPool = NewRoundRobin()
  203. c.connPool[conn.Address()] = connPool
  204. c.hostPool.AddNode(connPool)
  205. }
  206. connPool.AddNode(conn)
  207. c.conns[conn] = struct{}{}
  208. return nil
  209. }
  210. //fillPool manages the pool of connections making sure that each host has the correct
  211. //amount of connections defined. Also the method will test a host with one connection
  212. //instead of flooding the host with number of connections defined in the cluster config
  213. func (c *SimplePool) fillPool() {
  214. //Debounce large amounts of requests to fill pool
  215. select {
  216. case <-time.After(1 * time.Millisecond):
  217. return
  218. case <-c.cFillingPool:
  219. defer func() { c.cFillingPool <- 1 }()
  220. }
  221. c.mu.Lock()
  222. isClosed := c.quit
  223. c.mu.Unlock()
  224. //Exit if cluster(session) is closed
  225. if isClosed {
  226. return
  227. }
  228. c.hostMu.RLock()
  229. //Walk through list of defined hosts
  230. var wg sync.WaitGroup
  231. for host := range c.hosts {
  232. addr := JoinHostPort(host, c.cfg.Port)
  233. numConns := 1
  234. //See if the host already has connections in the pool
  235. c.mu.Lock()
  236. conns, ok := c.connPool[addr]
  237. c.mu.Unlock()
  238. if ok {
  239. //if the host has enough connections just exit
  240. numConns = conns.Size()
  241. if numConns >= c.cfg.NumConns {
  242. continue
  243. }
  244. } else {
  245. //See if the host is reachable
  246. if err := c.connect(addr); err != nil {
  247. continue
  248. }
  249. }
  250. //This is reached if the host is responsive and needs more connections
  251. //Create connections for host synchronously to mitigate flooding the host.
  252. wg.Add(1)
  253. go func(a string, conns int) {
  254. defer wg.Done()
  255. for ; conns < c.cfg.NumConns; conns++ {
  256. c.connect(a)
  257. }
  258. }(addr, numConns)
  259. }
  260. c.hostMu.RUnlock()
  261. //Wait until we're finished connecting to each host before returning
  262. wg.Wait()
  263. }
  264. // Should only be called if c.mu is locked
  265. func (c *SimplePool) removeConnLocked(conn *Conn) {
  266. conn.Close()
  267. connPool := c.connPool[conn.addr]
  268. if connPool == nil {
  269. return
  270. }
  271. connPool.RemoveNode(conn)
  272. if connPool.Size() == 0 {
  273. c.hostPool.RemoveNode(connPool)
  274. delete(c.connPool, conn.addr)
  275. }
  276. delete(c.conns, conn)
  277. }
  278. func (c *SimplePool) removeConn(conn *Conn) {
  279. c.mu.Lock()
  280. defer c.mu.Unlock()
  281. c.removeConnLocked(conn)
  282. }
  283. //HandleError is called by a Connection object to report to the pool an error has occured.
  284. //Logic is then executed within the pool to clean up the erroroneous connection and try to
  285. //top off the pool.
  286. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
  287. if !closed {
  288. // ignore all non-fatal errors
  289. return
  290. }
  291. c.removeConn(conn)
  292. c.mu.Lock()
  293. poolClosed := c.quit
  294. c.mu.Unlock()
  295. if !poolClosed {
  296. go c.fillPool() // top off pool.
  297. }
  298. }
  299. //Pick selects a connection to be used by the query.
  300. func (c *SimplePool) Pick(qry *Query) *Conn {
  301. //Check if connections are available
  302. c.mu.Lock()
  303. conns := len(c.conns)
  304. c.mu.Unlock()
  305. if conns == 0 {
  306. //try to populate the pool before returning.
  307. c.fillPool()
  308. }
  309. return c.hostPool.Pick(qry)
  310. }
  311. //Size returns the number of connections currently active in the pool
  312. func (p *SimplePool) Size() int {
  313. p.mu.Lock()
  314. conns := len(p.conns)
  315. p.mu.Unlock()
  316. return conns
  317. }
  318. //Close kills the pool and all associated connections.
  319. func (c *SimplePool) Close() {
  320. c.quitOnce.Do(func() {
  321. c.mu.Lock()
  322. defer c.mu.Unlock()
  323. c.quit = true
  324. close(c.quitWait)
  325. for conn := range c.conns {
  326. c.removeConnLocked(conn)
  327. }
  328. })
  329. }
  330. func (c *SimplePool) SetHosts(hosts []HostInfo) {
  331. c.hostMu.Lock()
  332. toRemove := make(map[string]struct{})
  333. for k := range c.hosts {
  334. toRemove[k] = struct{}{}
  335. }
  336. for _, host := range hosts {
  337. host := host
  338. delete(toRemove, host.Peer)
  339. // we already have it
  340. if _, ok := c.hosts[host.Peer]; ok {
  341. // TODO: Check rack, dc, token range is consistent, trigger topology change
  342. // update stored host
  343. continue
  344. }
  345. c.hosts[host.Peer] = &host
  346. }
  347. // can we hold c.mu whilst iterating this loop?
  348. for addr := range toRemove {
  349. c.removeHostLocked(addr)
  350. }
  351. c.hostMu.Unlock()
  352. c.fillPool()
  353. }
  354. func (c *SimplePool) removeHostLocked(addr string) {
  355. if _, ok := c.hosts[addr]; !ok {
  356. return
  357. }
  358. delete(c.hosts, addr)
  359. c.mu.Lock()
  360. defer c.mu.Unlock()
  361. if _, ok := c.connPool[addr]; !ok {
  362. return
  363. }
  364. for conn := range c.conns {
  365. if conn.Address() == addr {
  366. c.removeConnLocked(conn)
  367. }
  368. }
  369. }
  370. //NewRoundRobinConnPool creates a connection pool which selects hosts by
  371. //round-robin, and then selects a connection for that host by round-robin.
  372. func NewRoundRobinConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  373. return NewPolicyConnPool(
  374. cfg,
  375. NewRoundRobinHostPolicy(),
  376. NewRoundRobinConnPolicy,
  377. )
  378. }
  379. //NewTokenAwareConnPool creates a connection pool which selects hosts by
  380. //a token aware policy, and then selects a connection for that host by
  381. //round-robin.
  382. func NewTokenAwareConnPool(cfg *ClusterConfig) (ConnectionPool, error) {
  383. return NewPolicyConnPool(
  384. cfg,
  385. NewTokenAwareHostPolicy(NewRoundRobinHostPolicy()),
  386. NewRoundRobinConnPolicy,
  387. )
  388. }
  389. type policyConnPool struct {
  390. port int
  391. numConns int
  392. connCfg ConnConfig
  393. keyspace string
  394. mu sync.RWMutex
  395. hostPolicy HostSelectionPolicy
  396. connPolicy func() ConnSelectionPolicy
  397. hostConnPools map[string]*hostConnPool
  398. }
  399. func NewPolicyConnPool(
  400. cfg *ClusterConfig,
  401. hostPolicy HostSelectionPolicy,
  402. connPolicy func() ConnSelectionPolicy,
  403. ) (ConnectionPool, error) {
  404. var err error
  405. var tlsConfig *tls.Config
  406. if cfg.SslOpts != nil {
  407. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  408. if err != nil {
  409. return nil, err
  410. }
  411. }
  412. // create the pool
  413. pool := &policyConnPool{
  414. port: cfg.Port,
  415. numConns: cfg.NumConns,
  416. connCfg: ConnConfig{
  417. ProtoVersion: cfg.ProtoVersion,
  418. CQLVersion: cfg.CQLVersion,
  419. Timeout: cfg.Timeout,
  420. NumStreams: cfg.NumStreams,
  421. Compressor: cfg.Compressor,
  422. Authenticator: cfg.Authenticator,
  423. Keepalive: cfg.SocketKeepalive,
  424. tlsConfig: tlsConfig,
  425. },
  426. keyspace: cfg.Keyspace,
  427. hostPolicy: hostPolicy,
  428. connPolicy: connPolicy,
  429. hostConnPools: map[string]*hostConnPool{},
  430. }
  431. hosts := make([]HostInfo, len(cfg.Hosts))
  432. for i, hostAddr := range cfg.Hosts {
  433. hosts[i].Peer = hostAddr
  434. }
  435. pool.SetHosts(hosts)
  436. return pool, nil
  437. }
  438. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  439. p.mu.Lock()
  440. toRemove := make(map[string]struct{})
  441. for addr := range p.hostConnPools {
  442. toRemove[addr] = struct{}{}
  443. }
  444. // TODO connect to hosts in parallel, but wait for pools to be
  445. // created before returning
  446. for i := range hosts {
  447. pool, exists := p.hostConnPools[hosts[i].Peer]
  448. if !exists {
  449. // create a connection pool for the host
  450. pool = newHostConnPool(
  451. hosts[i].Peer,
  452. p.port,
  453. p.numConns,
  454. p.connCfg,
  455. p.keyspace,
  456. p.connPolicy(),
  457. )
  458. p.hostConnPools[hosts[i].Peer] = pool
  459. } else {
  460. // still have this host, so don't remove it
  461. delete(toRemove, hosts[i].Peer)
  462. }
  463. }
  464. for addr := range toRemove {
  465. pool := p.hostConnPools[addr]
  466. delete(p.hostConnPools, addr)
  467. pool.Close()
  468. }
  469. // update the policy
  470. p.hostPolicy.SetHosts(hosts)
  471. p.mu.Unlock()
  472. }
  473. func (p *policyConnPool) SetPartitioner(partitioner string) {
  474. p.hostPolicy.SetPartitioner(partitioner)
  475. }
  476. func (p *policyConnPool) Size() int {
  477. p.mu.RLock()
  478. count := 0
  479. for _, pool := range p.hostConnPools {
  480. count += pool.Size()
  481. }
  482. p.mu.RUnlock()
  483. return count
  484. }
  485. func (p *policyConnPool) Pick(qry *Query) *Conn {
  486. nextHost := p.hostPolicy.Pick(qry)
  487. p.mu.RLock()
  488. var host *HostInfo
  489. var conn *Conn
  490. for conn == nil {
  491. host = nextHost()
  492. if host == nil {
  493. break
  494. }
  495. conn = p.hostConnPools[host.Peer].Pick(qry)
  496. }
  497. p.mu.RUnlock()
  498. return conn
  499. }
  500. func (p *policyConnPool) Close() {
  501. p.mu.Lock()
  502. // remove the hosts from the policy
  503. p.hostPolicy.SetHosts([]HostInfo{})
  504. // close the pools
  505. for addr, pool := range p.hostConnPools {
  506. delete(p.hostConnPools, addr)
  507. pool.Close()
  508. }
  509. p.mu.Unlock()
  510. }
  511. // hostConnPool is a connection pool for a single host.
  512. // Connection selection is based on a provided ConnSelectionPolicy
  513. type hostConnPool struct {
  514. host string
  515. port int
  516. addr string
  517. size int
  518. connCfg ConnConfig
  519. keyspace string
  520. policy ConnSelectionPolicy
  521. // protection for conns, closed, filling
  522. mu sync.Mutex
  523. conns []*Conn
  524. closed bool
  525. filling bool
  526. }
  527. func newHostConnPool(
  528. host string,
  529. port int,
  530. size int,
  531. connCfg ConnConfig,
  532. keyspace string,
  533. policy ConnSelectionPolicy,
  534. ) *hostConnPool {
  535. pool := &hostConnPool{
  536. host: host,
  537. port: port,
  538. addr: JoinHostPort(host, port),
  539. size: size,
  540. connCfg: connCfg,
  541. keyspace: keyspace,
  542. policy: policy,
  543. conns: make([]*Conn, 0, size),
  544. filling: false,
  545. closed: false,
  546. }
  547. // fill the pool with the initial connections before returning
  548. pool.fill()
  549. return pool
  550. }
  551. // Pick a connection from this connection pool for the given query.
  552. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  553. pool.mu.Lock()
  554. if pool.closed {
  555. pool.mu.Unlock()
  556. return nil
  557. }
  558. empty := len(pool.conns) == 0
  559. pool.mu.Unlock()
  560. if empty {
  561. // try to fill the empty pool
  562. pool.fill()
  563. }
  564. return pool.policy.Pick(qry)
  565. }
  566. //Size returns the number of connections currently active in the pool
  567. func (pool *hostConnPool) Size() int {
  568. pool.mu.Lock()
  569. defer pool.mu.Unlock()
  570. return len(pool.conns)
  571. }
  572. //Close the connection pool
  573. func (pool *hostConnPool) Close() {
  574. pool.mu.Lock()
  575. defer pool.mu.Unlock()
  576. if pool.closed {
  577. return
  578. }
  579. pool.closed = true
  580. // drain, but don't wait
  581. go pool.drain()
  582. }
  583. // Fill the connection pool
  584. func (pool *hostConnPool) fill() {
  585. pool.mu.Lock()
  586. // avoid filling a closed pool, or concurrent filling
  587. if pool.closed || pool.filling {
  588. pool.mu.Unlock()
  589. return
  590. }
  591. // determine the filling work to be done
  592. startCount := len(pool.conns)
  593. fillCount := pool.size - startCount
  594. // avoid filling a full (or overfull) pool
  595. if fillCount <= 0 {
  596. pool.mu.Unlock()
  597. return
  598. }
  599. // ok fill the pool
  600. pool.filling = true
  601. // allow others to access the pool while filling
  602. pool.mu.Unlock()
  603. // fill only the first connection synchronously
  604. if startCount == 0 {
  605. err := pool.connect()
  606. if opErr, ok := err.(*net.OpError); ok && opErr.Op == "read" {
  607. // connection refused
  608. // these are typical during a node outage so avoid log spam.
  609. } else if err != nil {
  610. log.Printf("error: failed to connect to %s - %v", pool.addr, err)
  611. }
  612. if err != nil {
  613. // probably unreachable host
  614. go pool.fillingStopped()
  615. return
  616. }
  617. // filled one
  618. fillCount--
  619. }
  620. // fill the rest of the pool asynchronously
  621. go func() {
  622. for fillCount > 0 {
  623. err := pool.connect()
  624. if opErr, ok := err.(*net.OpError); ok && opErr.Op == "read" {
  625. // connection refused
  626. // these are typical during a node outage so avoid log spam.
  627. } else if err != nil {
  628. log.Printf("error: failed to connect to %s - %v", pool.addr, err)
  629. }
  630. // decrement, even on error
  631. fillCount--
  632. }
  633. // mark the end of filling
  634. pool.fillingStopped()
  635. }()
  636. }
  637. // transition back to a not-filling state.
  638. func (pool *hostConnPool) fillingStopped() {
  639. // wait for some time to avoid back-to-back filling
  640. time.Sleep(100 * time.Millisecond)
  641. pool.mu.Lock()
  642. pool.filling = false
  643. pool.mu.Unlock()
  644. }
  645. // create a new connection to the host and add it to the pool
  646. func (pool *hostConnPool) connect() error {
  647. // try to connect
  648. conn, err := Connect(pool.addr, pool.connCfg, pool)
  649. if err != nil {
  650. return err
  651. }
  652. if pool.keyspace != "" {
  653. // set the keyspace
  654. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  655. conn.Close()
  656. return err
  657. }
  658. }
  659. // add the Conn to the pool
  660. pool.mu.Lock()
  661. defer pool.mu.Unlock()
  662. if pool.closed {
  663. conn.Close()
  664. return nil
  665. }
  666. pool.conns = append(pool.conns, conn)
  667. pool.policy.SetConns(pool.conns)
  668. return nil
  669. }
  670. // handle any error from a Conn
  671. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  672. if !closed {
  673. // still an open connection, so continue using it
  674. return
  675. }
  676. pool.mu.Lock()
  677. defer pool.mu.Unlock()
  678. // find the connection index
  679. for i, candidate := range pool.conns {
  680. if candidate == conn {
  681. // remove the connection, not preserving order
  682. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  683. // update the policy
  684. pool.policy.SetConns(pool.conns)
  685. // lost a connection, so fill the pool
  686. go pool.fill()
  687. break
  688. }
  689. }
  690. }
  691. // removes and closes all connections from the pool
  692. func (pool *hostConnPool) drain() {
  693. pool.mu.Lock()
  694. defer pool.mu.Unlock()
  695. // empty the pool
  696. conns := pool.conns
  697. pool.conns = pool.conns[:0]
  698. // update the policy
  699. pool.policy.SetConns(pool.conns)
  700. // close the connections
  701. for _, conn := range conns {
  702. conn.Close()
  703. }
  704. }