connectionpool.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []*HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. if sslOpts.Config == nil {
  27. sslOpts.Config = &tls.Config{}
  28. }
  29. // ca cert is optional
  30. if sslOpts.CaPath != "" {
  31. if sslOpts.RootCAs == nil {
  32. sslOpts.RootCAs = x509.NewCertPool()
  33. }
  34. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  35. if err != nil {
  36. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  37. }
  38. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  39. return nil, errors.New("connectionpool: failed parsing or CA certs")
  40. }
  41. }
  42. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  43. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  44. if err != nil {
  45. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  46. }
  47. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  48. }
  49. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  50. return sslOpts.Config, nil
  51. }
  52. type policyConnPool struct {
  53. session *Session
  54. port int
  55. numConns int
  56. keyspace string
  57. mu sync.RWMutex
  58. hostConnPools map[string]*hostConnPool
  59. endpoints []string
  60. }
  61. func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
  62. var (
  63. err error
  64. tlsConfig *tls.Config
  65. )
  66. // TODO(zariel): move tls config setup into session init.
  67. if cfg.SslOpts != nil {
  68. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  69. if err != nil {
  70. return nil, err
  71. }
  72. }
  73. return &ConnConfig{
  74. ProtoVersion: cfg.ProtoVersion,
  75. CQLVersion: cfg.CQLVersion,
  76. Timeout: cfg.Timeout,
  77. ConnectTimeout: cfg.ConnectTimeout,
  78. Compressor: cfg.Compressor,
  79. Authenticator: cfg.Authenticator,
  80. Keepalive: cfg.SocketKeepalive,
  81. tlsConfig: tlsConfig,
  82. }, nil
  83. }
  84. func newPolicyConnPool(session *Session) *policyConnPool {
  85. // create the pool
  86. pool := &policyConnPool{
  87. session: session,
  88. port: session.cfg.Port,
  89. numConns: session.cfg.NumConns,
  90. keyspace: session.cfg.Keyspace,
  91. hostConnPools: map[string]*hostConnPool{},
  92. }
  93. pool.endpoints = make([]string, len(session.cfg.Hosts))
  94. copy(pool.endpoints, session.cfg.Hosts)
  95. return pool
  96. }
  97. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  98. p.mu.Lock()
  99. defer p.mu.Unlock()
  100. toRemove := make(map[string]struct{})
  101. for addr := range p.hostConnPools {
  102. toRemove[addr] = struct{}{}
  103. }
  104. pools := make(chan *hostConnPool)
  105. createCount := 0
  106. for _, host := range hosts {
  107. if !host.IsUp() {
  108. // don't create a connection pool for a down host
  109. continue
  110. }
  111. ip := host.ConnectAddress().String()
  112. if _, exists := p.hostConnPools[ip]; exists {
  113. // still have this host, so don't remove it
  114. delete(toRemove, ip)
  115. continue
  116. }
  117. createCount++
  118. go func(host *HostInfo) {
  119. // create a connection pool for the host
  120. pools <- newHostConnPool(
  121. p.session,
  122. host,
  123. p.port,
  124. p.numConns,
  125. p.keyspace,
  126. )
  127. }(host)
  128. }
  129. // add created pools
  130. for createCount > 0 {
  131. pool := <-pools
  132. createCount--
  133. if pool.Size() > 0 {
  134. // add pool only if there a connections available
  135. p.hostConnPools[string(pool.host.ConnectAddress())] = pool
  136. }
  137. }
  138. for addr := range toRemove {
  139. pool := p.hostConnPools[addr]
  140. delete(p.hostConnPools, addr)
  141. go pool.Close()
  142. }
  143. }
  144. func (p *policyConnPool) Size() int {
  145. p.mu.RLock()
  146. count := 0
  147. for _, pool := range p.hostConnPools {
  148. count += pool.Size()
  149. }
  150. p.mu.RUnlock()
  151. return count
  152. }
  153. func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
  154. ip := host.ConnectAddress().String()
  155. p.mu.RLock()
  156. pool, ok = p.hostConnPools[ip]
  157. p.mu.RUnlock()
  158. return
  159. }
  160. func (p *policyConnPool) Close() {
  161. p.mu.Lock()
  162. defer p.mu.Unlock()
  163. // close the pools
  164. for addr, pool := range p.hostConnPools {
  165. delete(p.hostConnPools, addr)
  166. pool.Close()
  167. }
  168. }
  169. func (p *policyConnPool) addHost(host *HostInfo) {
  170. ip := host.ConnectAddress().String()
  171. p.mu.Lock()
  172. pool, ok := p.hostConnPools[ip]
  173. if !ok {
  174. pool = newHostConnPool(
  175. p.session,
  176. host,
  177. host.Port(), // TODO: if port == 0 use pool.port?
  178. p.numConns,
  179. p.keyspace,
  180. )
  181. p.hostConnPools[ip] = pool
  182. }
  183. p.mu.Unlock()
  184. pool.fill()
  185. }
  186. func (p *policyConnPool) removeHost(ip net.IP) {
  187. k := ip.String()
  188. p.mu.Lock()
  189. pool, ok := p.hostConnPools[k]
  190. if !ok {
  191. p.mu.Unlock()
  192. return
  193. }
  194. delete(p.hostConnPools, k)
  195. p.mu.Unlock()
  196. go pool.Close()
  197. }
  198. func (p *policyConnPool) hostUp(host *HostInfo) {
  199. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  200. // detect down hosts, then try to reconnect to them.
  201. p.addHost(host)
  202. }
  203. func (p *policyConnPool) hostDown(ip net.IP) {
  204. // TODO(zariel): mark host as down so we can try to connect to it later, for
  205. // now just treat it has removed.
  206. p.removeHost(ip)
  207. }
  208. // hostConnPool is a connection pool for a single host.
  209. // Connection selection is based on a provided ConnSelectionPolicy
  210. type hostConnPool struct {
  211. session *Session
  212. host *HostInfo
  213. port int
  214. addr string
  215. size int
  216. keyspace string
  217. // protection for conns, closed, filling
  218. mu sync.RWMutex
  219. conns []*Conn
  220. closed bool
  221. filling bool
  222. pos uint32
  223. }
  224. func (h *hostConnPool) String() string {
  225. h.mu.RLock()
  226. defer h.mu.RUnlock()
  227. return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
  228. h.filling, h.closed, len(h.conns), h.size, h.host)
  229. }
  230. func newHostConnPool(session *Session, host *HostInfo, port, size int,
  231. keyspace string) *hostConnPool {
  232. pool := &hostConnPool{
  233. session: session,
  234. host: host,
  235. port: port,
  236. addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
  237. size: size,
  238. keyspace: keyspace,
  239. conns: make([]*Conn, 0, size),
  240. filling: false,
  241. closed: false,
  242. }
  243. // the pool is not filled or connected
  244. return pool
  245. }
  246. // Pick a connection from this connection pool for the given query.
  247. func (pool *hostConnPool) Pick() *Conn {
  248. pool.mu.RLock()
  249. defer pool.mu.RUnlock()
  250. if pool.closed {
  251. return nil
  252. }
  253. size := len(pool.conns)
  254. if size < pool.size {
  255. // try to fill the pool
  256. go pool.fill()
  257. if size == 0 {
  258. return nil
  259. }
  260. }
  261. pos := int(atomic.AddUint32(&pool.pos, 1) - 1)
  262. var (
  263. leastBusyConn *Conn
  264. streamsAvailable int
  265. )
  266. // find the conn which has the most available streams, this is racy
  267. for i := 0; i < size; i++ {
  268. conn := pool.conns[(pos+i)%size]
  269. if streams := conn.AvailableStreams(); streams > streamsAvailable {
  270. leastBusyConn = conn
  271. streamsAvailable = streams
  272. }
  273. }
  274. return leastBusyConn
  275. }
  276. //Size returns the number of connections currently active in the pool
  277. func (pool *hostConnPool) Size() int {
  278. pool.mu.RLock()
  279. defer pool.mu.RUnlock()
  280. return len(pool.conns)
  281. }
  282. //Close the connection pool
  283. func (pool *hostConnPool) Close() {
  284. pool.mu.Lock()
  285. defer pool.mu.Unlock()
  286. if pool.closed {
  287. return
  288. }
  289. pool.closed = true
  290. pool.drainLocked()
  291. }
  292. // Fill the connection pool
  293. func (pool *hostConnPool) fill() {
  294. pool.mu.RLock()
  295. // avoid filling a closed pool, or concurrent filling
  296. if pool.closed || pool.filling {
  297. pool.mu.RUnlock()
  298. return
  299. }
  300. // determine the filling work to be done
  301. startCount := len(pool.conns)
  302. fillCount := pool.size - startCount
  303. // avoid filling a full (or overfull) pool
  304. if fillCount <= 0 {
  305. pool.mu.RUnlock()
  306. return
  307. }
  308. // switch from read to write lock
  309. pool.mu.RUnlock()
  310. pool.mu.Lock()
  311. // double check everything since the lock was released
  312. startCount = len(pool.conns)
  313. fillCount = pool.size - startCount
  314. if pool.closed || pool.filling || fillCount <= 0 {
  315. // looks like another goroutine already beat this
  316. // goroutine to the filling
  317. pool.mu.Unlock()
  318. return
  319. }
  320. // ok fill the pool
  321. pool.filling = true
  322. // allow others to access the pool while filling
  323. pool.mu.Unlock()
  324. // only this goroutine should make calls to fill/empty the pool at this
  325. // point until after this routine or its subordinates calls
  326. // fillingStopped
  327. // fill only the first connection synchronously
  328. if startCount == 0 {
  329. err := pool.connect()
  330. pool.logConnectErr(err)
  331. if err != nil {
  332. // probably unreachable host
  333. pool.fillingStopped(true)
  334. // this is call with the connection pool mutex held, this call will
  335. // then recursively try to lock it again. FIXME
  336. go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
  337. return
  338. }
  339. // filled one
  340. fillCount--
  341. }
  342. // fill the rest of the pool asynchronously
  343. go func() {
  344. err := pool.connectMany(fillCount)
  345. // mark the end of filling
  346. pool.fillingStopped(err != nil)
  347. }()
  348. }
  349. func (pool *hostConnPool) logConnectErr(err error) {
  350. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  351. // connection refused
  352. // these are typical during a node outage so avoid log spam.
  353. if gocqlDebug {
  354. Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err)
  355. }
  356. } else if err != nil {
  357. // unexpected error
  358. Logger.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  359. }
  360. }
  361. // transition back to a not-filling state.
  362. func (pool *hostConnPool) fillingStopped(hadError bool) {
  363. if hadError {
  364. // wait for some time to avoid back-to-back filling
  365. // this provides some time between failed attempts
  366. // to fill the pool for the host to recover
  367. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  368. }
  369. pool.mu.Lock()
  370. pool.filling = false
  371. pool.mu.Unlock()
  372. }
  373. // connectMany creates new connections concurrent.
  374. func (pool *hostConnPool) connectMany(count int) error {
  375. if count == 0 {
  376. return nil
  377. }
  378. var (
  379. wg sync.WaitGroup
  380. mu sync.Mutex
  381. connectErr error
  382. )
  383. wg.Add(count)
  384. for i := 0; i < count; i++ {
  385. go func() {
  386. defer wg.Done()
  387. err := pool.connect()
  388. pool.logConnectErr(err)
  389. if err != nil {
  390. mu.Lock()
  391. connectErr = err
  392. mu.Unlock()
  393. }
  394. }()
  395. }
  396. // wait for all connections are done
  397. wg.Wait()
  398. return connectErr
  399. }
  400. // create a new connection to the host and add it to the pool
  401. func (pool *hostConnPool) connect() (err error) {
  402. // TODO: provide a more robust connection retry mechanism, we should also
  403. // be able to detect hosts that come up by trying to connect to downed ones.
  404. const maxAttempts = 3
  405. // try to connect
  406. var conn *Conn
  407. for i := 0; i < maxAttempts; i++ {
  408. conn, err = pool.session.connect(pool.host, pool)
  409. if err == nil {
  410. break
  411. }
  412. if opErr, isOpErr := err.(*net.OpError); isOpErr {
  413. // if the error is not a temporary error (ex: network unreachable) don't
  414. // retry
  415. if !opErr.Temporary() {
  416. break
  417. }
  418. }
  419. }
  420. if err != nil {
  421. return err
  422. }
  423. if pool.keyspace != "" {
  424. // set the keyspace
  425. if err = conn.UseKeyspace(pool.keyspace); err != nil {
  426. conn.Close()
  427. return err
  428. }
  429. }
  430. // add the Conn to the pool
  431. pool.mu.Lock()
  432. defer pool.mu.Unlock()
  433. if pool.closed {
  434. conn.Close()
  435. return nil
  436. }
  437. pool.conns = append(pool.conns, conn)
  438. return nil
  439. }
  440. // handle any error from a Conn
  441. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  442. if !closed {
  443. // still an open connection, so continue using it
  444. return
  445. }
  446. // TODO: track the number of errors per host and detect when a host is dead,
  447. // then also have something which can detect when a host comes back.
  448. pool.mu.Lock()
  449. defer pool.mu.Unlock()
  450. if pool.closed {
  451. // pool closed
  452. return
  453. }
  454. // find the connection index
  455. for i, candidate := range pool.conns {
  456. if candidate == conn {
  457. // remove the connection, not preserving order
  458. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  459. // lost a connection, so fill the pool
  460. go pool.fill()
  461. break
  462. }
  463. }
  464. }
  465. func (pool *hostConnPool) drainLocked() {
  466. // empty the pool
  467. conns := pool.conns
  468. pool.conns = nil
  469. // close the connections
  470. for _, conn := range conns {
  471. conn.Close()
  472. }
  473. }
  474. // removes and closes all connections from the pool
  475. func (pool *hostConnPool) drain() {
  476. pool.mu.Lock()
  477. defer pool.mu.Unlock()
  478. pool.drainLocked()
  479. }