connectionpool.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []*HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. if sslOpts.Config == nil {
  27. sslOpts.Config = &tls.Config{}
  28. }
  29. // ca cert is optional
  30. if sslOpts.CaPath != "" {
  31. if sslOpts.RootCAs == nil {
  32. sslOpts.RootCAs = x509.NewCertPool()
  33. }
  34. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  35. if err != nil {
  36. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  37. }
  38. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  39. return nil, errors.New("connectionpool: failed parsing or CA certs")
  40. }
  41. }
  42. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  43. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  44. if err != nil {
  45. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  46. }
  47. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  48. }
  49. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  50. // return clone to avoid race
  51. return sslOpts.Config.Clone(), nil
  52. }
  53. type policyConnPool struct {
  54. session *Session
  55. port int
  56. numConns int
  57. keyspace string
  58. mu sync.RWMutex
  59. hostConnPools map[string]*hostConnPool
  60. endpoints []string
  61. }
  62. func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
  63. var (
  64. err error
  65. tlsConfig *tls.Config
  66. )
  67. // TODO(zariel): move tls config setup into session init.
  68. if cfg.SslOpts != nil {
  69. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  70. if err != nil {
  71. return nil, err
  72. }
  73. }
  74. return &ConnConfig{
  75. ProtoVersion: cfg.ProtoVersion,
  76. CQLVersion: cfg.CQLVersion,
  77. Timeout: cfg.Timeout,
  78. ConnectTimeout: cfg.ConnectTimeout,
  79. Dialer: cfg.Dialer,
  80. Compressor: cfg.Compressor,
  81. Authenticator: cfg.Authenticator,
  82. AuthProvider: cfg.AuthProvider,
  83. Keepalive: cfg.SocketKeepalive,
  84. tlsConfig: tlsConfig,
  85. disableCoalesce: tlsConfig != nil, // write coalescing doesn't work with framing on top of TCP like in TLS.
  86. }, nil
  87. }
  88. func newPolicyConnPool(session *Session) *policyConnPool {
  89. // create the pool
  90. pool := &policyConnPool{
  91. session: session,
  92. port: session.cfg.Port,
  93. numConns: session.cfg.NumConns,
  94. keyspace: session.cfg.Keyspace,
  95. hostConnPools: map[string]*hostConnPool{},
  96. }
  97. pool.endpoints = make([]string, len(session.cfg.Hosts))
  98. copy(pool.endpoints, session.cfg.Hosts)
  99. return pool
  100. }
  101. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  102. p.mu.Lock()
  103. defer p.mu.Unlock()
  104. toRemove := make(map[string]struct{})
  105. for addr := range p.hostConnPools {
  106. toRemove[addr] = struct{}{}
  107. }
  108. pools := make(chan *hostConnPool)
  109. createCount := 0
  110. for _, host := range hosts {
  111. if !host.IsUp() {
  112. // don't create a connection pool for a down host
  113. continue
  114. }
  115. ip := host.ConnectAddress().String()
  116. if _, exists := p.hostConnPools[ip]; exists {
  117. // still have this host, so don't remove it
  118. delete(toRemove, ip)
  119. continue
  120. }
  121. createCount++
  122. go func(host *HostInfo) {
  123. // create a connection pool for the host
  124. pools <- newHostConnPool(
  125. p.session,
  126. host,
  127. p.port,
  128. p.numConns,
  129. p.keyspace,
  130. )
  131. }(host)
  132. }
  133. // add created pools
  134. for createCount > 0 {
  135. pool := <-pools
  136. createCount--
  137. if pool.Size() > 0 {
  138. // add pool only if there a connections available
  139. p.hostConnPools[string(pool.host.ConnectAddress())] = pool
  140. }
  141. }
  142. for addr := range toRemove {
  143. pool := p.hostConnPools[addr]
  144. delete(p.hostConnPools, addr)
  145. go pool.Close()
  146. }
  147. }
  148. func (p *policyConnPool) Size() int {
  149. p.mu.RLock()
  150. count := 0
  151. for _, pool := range p.hostConnPools {
  152. count += pool.Size()
  153. }
  154. p.mu.RUnlock()
  155. return count
  156. }
  157. func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
  158. ip := host.ConnectAddress().String()
  159. p.mu.RLock()
  160. pool, ok = p.hostConnPools[ip]
  161. p.mu.RUnlock()
  162. return
  163. }
  164. func (p *policyConnPool) Close() {
  165. p.mu.Lock()
  166. defer p.mu.Unlock()
  167. // close the pools
  168. for addr, pool := range p.hostConnPools {
  169. delete(p.hostConnPools, addr)
  170. pool.Close()
  171. }
  172. }
  173. func (p *policyConnPool) addHost(host *HostInfo) {
  174. ip := host.ConnectAddress().String()
  175. p.mu.Lock()
  176. pool, ok := p.hostConnPools[ip]
  177. if !ok {
  178. pool = newHostConnPool(
  179. p.session,
  180. host,
  181. host.Port(), // TODO: if port == 0 use pool.port?
  182. p.numConns,
  183. p.keyspace,
  184. )
  185. p.hostConnPools[ip] = pool
  186. }
  187. p.mu.Unlock()
  188. pool.fill()
  189. }
  190. func (p *policyConnPool) removeHost(ip net.IP) {
  191. k := ip.String()
  192. p.mu.Lock()
  193. pool, ok := p.hostConnPools[k]
  194. if !ok {
  195. p.mu.Unlock()
  196. return
  197. }
  198. delete(p.hostConnPools, k)
  199. p.mu.Unlock()
  200. go pool.Close()
  201. }
  202. func (p *policyConnPool) hostUp(host *HostInfo) {
  203. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  204. // detect down hosts, then try to reconnect to them.
  205. p.addHost(host)
  206. }
  207. func (p *policyConnPool) hostDown(ip net.IP) {
  208. // TODO(zariel): mark host as down so we can try to connect to it later, for
  209. // now just treat it has removed.
  210. p.removeHost(ip)
  211. }
  212. // hostConnPool is a connection pool for a single host.
  213. // Connection selection is based on a provided ConnSelectionPolicy
  214. type hostConnPool struct {
  215. session *Session
  216. host *HostInfo
  217. port int
  218. addr string
  219. size int
  220. keyspace string
  221. // protection for conns, closed, filling
  222. mu sync.RWMutex
  223. conns []*Conn
  224. closed bool
  225. filling bool
  226. pos uint32
  227. }
  228. func (h *hostConnPool) String() string {
  229. h.mu.RLock()
  230. defer h.mu.RUnlock()
  231. return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
  232. h.filling, h.closed, len(h.conns), h.size, h.host)
  233. }
  234. func newHostConnPool(session *Session, host *HostInfo, port, size int,
  235. keyspace string) *hostConnPool {
  236. pool := &hostConnPool{
  237. session: session,
  238. host: host,
  239. port: port,
  240. addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
  241. size: size,
  242. keyspace: keyspace,
  243. conns: make([]*Conn, 0, size),
  244. filling: false,
  245. closed: false,
  246. }
  247. // the pool is not filled or connected
  248. return pool
  249. }
  250. // Pick a connection from this connection pool for the given query.
  251. func (pool *hostConnPool) Pick() *Conn {
  252. pool.mu.RLock()
  253. defer pool.mu.RUnlock()
  254. if pool.closed {
  255. return nil
  256. }
  257. size := len(pool.conns)
  258. if size < pool.size {
  259. // try to fill the pool
  260. go pool.fill()
  261. if size == 0 {
  262. return nil
  263. }
  264. }
  265. pos := int(atomic.AddUint32(&pool.pos, 1) - 1)
  266. var (
  267. leastBusyConn *Conn
  268. streamsAvailable int
  269. )
  270. // find the conn which has the most available streams, this is racy
  271. for i := 0; i < size; i++ {
  272. conn := pool.conns[(pos+i)%size]
  273. if streams := conn.AvailableStreams(); streams > streamsAvailable {
  274. leastBusyConn = conn
  275. streamsAvailable = streams
  276. }
  277. }
  278. return leastBusyConn
  279. }
  280. //Size returns the number of connections currently active in the pool
  281. func (pool *hostConnPool) Size() int {
  282. pool.mu.RLock()
  283. defer pool.mu.RUnlock()
  284. return len(pool.conns)
  285. }
  286. //Close the connection pool
  287. func (pool *hostConnPool) Close() {
  288. pool.mu.Lock()
  289. if pool.closed {
  290. pool.mu.Unlock()
  291. return
  292. }
  293. pool.closed = true
  294. // ensure we dont try to reacquire the lock in handleError
  295. // TODO: improve this as the following can happen
  296. // 1) we have locked pool.mu write lock
  297. // 2) conn.Close calls conn.closeWithError(nil)
  298. // 3) conn.closeWithError calls conn.Close() which returns an error
  299. // 4) conn.closeWithError calls pool.HandleError with the error from conn.Close
  300. // 5) pool.HandleError tries to lock pool.mu
  301. // deadlock
  302. // empty the pool
  303. conns := pool.conns
  304. pool.conns = nil
  305. pool.mu.Unlock()
  306. // close the connections
  307. for _, conn := range conns {
  308. conn.Close()
  309. }
  310. }
  311. // Fill the connection pool
  312. func (pool *hostConnPool) fill() {
  313. pool.mu.RLock()
  314. // avoid filling a closed pool, or concurrent filling
  315. if pool.closed || pool.filling {
  316. pool.mu.RUnlock()
  317. return
  318. }
  319. // determine the filling work to be done
  320. startCount := len(pool.conns)
  321. fillCount := pool.size - startCount
  322. // avoid filling a full (or overfull) pool
  323. if fillCount <= 0 {
  324. pool.mu.RUnlock()
  325. return
  326. }
  327. // switch from read to write lock
  328. pool.mu.RUnlock()
  329. pool.mu.Lock()
  330. // double check everything since the lock was released
  331. startCount = len(pool.conns)
  332. fillCount = pool.size - startCount
  333. if pool.closed || pool.filling || fillCount <= 0 {
  334. // looks like another goroutine already beat this
  335. // goroutine to the filling
  336. pool.mu.Unlock()
  337. return
  338. }
  339. // ok fill the pool
  340. pool.filling = true
  341. // allow others to access the pool while filling
  342. pool.mu.Unlock()
  343. // only this goroutine should make calls to fill/empty the pool at this
  344. // point until after this routine or its subordinates calls
  345. // fillingStopped
  346. // fill only the first connection synchronously
  347. if startCount == 0 {
  348. err := pool.connect()
  349. pool.logConnectErr(err)
  350. if err != nil {
  351. // probably unreachable host
  352. pool.fillingStopped(true)
  353. // this is call with the connection pool mutex held, this call will
  354. // then recursively try to lock it again. FIXME
  355. if pool.session.cfg.ConvictionPolicy.AddFailure(err, pool.host) {
  356. go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
  357. }
  358. return
  359. }
  360. // filled one
  361. fillCount--
  362. }
  363. // fill the rest of the pool asynchronously
  364. go func() {
  365. err := pool.connectMany(fillCount)
  366. // mark the end of filling
  367. pool.fillingStopped(err != nil)
  368. }()
  369. }
  370. func (pool *hostConnPool) logConnectErr(err error) {
  371. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  372. // connection refused
  373. // these are typical during a node outage so avoid log spam.
  374. if gocqlDebug {
  375. Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err)
  376. }
  377. } else if err != nil {
  378. // unexpected error
  379. Logger.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  380. }
  381. }
  382. // transition back to a not-filling state.
  383. func (pool *hostConnPool) fillingStopped(hadError bool) {
  384. if hadError {
  385. // wait for some time to avoid back-to-back filling
  386. // this provides some time between failed attempts
  387. // to fill the pool for the host to recover
  388. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  389. }
  390. pool.mu.Lock()
  391. pool.filling = false
  392. pool.mu.Unlock()
  393. }
  394. // connectMany creates new connections concurrent.
  395. func (pool *hostConnPool) connectMany(count int) error {
  396. if count == 0 {
  397. return nil
  398. }
  399. var (
  400. wg sync.WaitGroup
  401. mu sync.Mutex
  402. connectErr error
  403. )
  404. wg.Add(count)
  405. for i := 0; i < count; i++ {
  406. go func() {
  407. defer wg.Done()
  408. err := pool.connect()
  409. pool.logConnectErr(err)
  410. if err != nil {
  411. mu.Lock()
  412. connectErr = err
  413. mu.Unlock()
  414. }
  415. }()
  416. }
  417. // wait for all connections are done
  418. wg.Wait()
  419. return connectErr
  420. }
  421. // create a new connection to the host and add it to the pool
  422. func (pool *hostConnPool) connect() (err error) {
  423. // TODO: provide a more robust connection retry mechanism, we should also
  424. // be able to detect hosts that come up by trying to connect to downed ones.
  425. // try to connect
  426. var conn *Conn
  427. reconnectionPolicy := pool.session.cfg.ReconnectionPolicy
  428. for i := 0; i < reconnectionPolicy.GetMaxRetries(); i++ {
  429. conn, err = pool.session.connect(pool.session.ctx, pool.host, pool)
  430. if err == nil {
  431. break
  432. }
  433. if opErr, isOpErr := err.(*net.OpError); isOpErr {
  434. // if the error is not a temporary error (ex: network unreachable) don't
  435. // retry
  436. if !opErr.Temporary() {
  437. break
  438. }
  439. }
  440. if gocqlDebug {
  441. Logger.Printf("connection failed %q: %v, reconnecting with %T\n",
  442. pool.host.ConnectAddress(), err, reconnectionPolicy)
  443. }
  444. time.Sleep(reconnectionPolicy.GetInterval(i))
  445. }
  446. if err != nil {
  447. return err
  448. }
  449. if pool.keyspace != "" {
  450. // set the keyspace
  451. if err = conn.UseKeyspace(pool.keyspace); err != nil {
  452. conn.Close()
  453. return err
  454. }
  455. }
  456. // add the Conn to the pool
  457. pool.mu.Lock()
  458. defer pool.mu.Unlock()
  459. if pool.closed {
  460. conn.Close()
  461. return nil
  462. }
  463. pool.conns = append(pool.conns, conn)
  464. return nil
  465. }
  466. // handle any error from a Conn
  467. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  468. if !closed {
  469. // still an open connection, so continue using it
  470. return
  471. }
  472. // TODO: track the number of errors per host and detect when a host is dead,
  473. // then also have something which can detect when a host comes back.
  474. pool.mu.Lock()
  475. defer pool.mu.Unlock()
  476. if pool.closed {
  477. // pool closed
  478. return
  479. }
  480. // find the connection index
  481. for i, candidate := range pool.conns {
  482. if candidate == conn {
  483. // remove the connection, not preserving order
  484. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  485. // lost a connection, so fill the pool
  486. go pool.fill()
  487. break
  488. }
  489. }
  490. }