connectionpool.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []*HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. // ca cert is optional
  27. if sslOpts.CaPath != "" {
  28. if sslOpts.RootCAs == nil {
  29. sslOpts.RootCAs = x509.NewCertPool()
  30. }
  31. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  32. if err != nil {
  33. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  34. }
  35. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  36. return nil, errors.New("connectionpool: failed parsing or CA certs")
  37. }
  38. }
  39. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  40. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  41. if err != nil {
  42. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  43. }
  44. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  45. }
  46. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  47. return &sslOpts.Config, nil
  48. }
  49. type policyConnPool struct {
  50. session *Session
  51. port int
  52. numConns int
  53. keyspace string
  54. mu sync.RWMutex
  55. hostConnPools map[string]*hostConnPool
  56. endpoints []string
  57. }
  58. func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
  59. var (
  60. err error
  61. tlsConfig *tls.Config
  62. )
  63. // TODO(zariel): move tls config setup into session init.
  64. if cfg.SslOpts != nil {
  65. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  66. if err != nil {
  67. return nil, err
  68. }
  69. }
  70. return &ConnConfig{
  71. ProtoVersion: cfg.ProtoVersion,
  72. CQLVersion: cfg.CQLVersion,
  73. Timeout: cfg.Timeout,
  74. ConnectTimeout: cfg.ConnectTimeout,
  75. Compressor: cfg.Compressor,
  76. Authenticator: cfg.Authenticator,
  77. Keepalive: cfg.SocketKeepalive,
  78. tlsConfig: tlsConfig,
  79. }, nil
  80. }
  81. func newPolicyConnPool(session *Session) *policyConnPool {
  82. // create the pool
  83. pool := &policyConnPool{
  84. session: session,
  85. port: session.cfg.Port,
  86. numConns: session.cfg.NumConns,
  87. keyspace: session.cfg.Keyspace,
  88. hostConnPools: map[string]*hostConnPool{},
  89. }
  90. pool.endpoints = make([]string, len(session.cfg.Hosts))
  91. copy(pool.endpoints, session.cfg.Hosts)
  92. return pool
  93. }
  94. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  95. p.mu.Lock()
  96. defer p.mu.Unlock()
  97. toRemove := make(map[string]struct{})
  98. for addr := range p.hostConnPools {
  99. toRemove[addr] = struct{}{}
  100. }
  101. pools := make(chan *hostConnPool)
  102. createCount := 0
  103. for _, host := range hosts {
  104. if !host.IsUp() {
  105. // don't create a connection pool for a down host
  106. continue
  107. }
  108. ip := host.Peer().String()
  109. if _, exists := p.hostConnPools[ip]; exists {
  110. // still have this host, so don't remove it
  111. delete(toRemove, ip)
  112. continue
  113. }
  114. createCount++
  115. go func(host *HostInfo) {
  116. // create a connection pool for the host
  117. pools <- newHostConnPool(
  118. p.session,
  119. host,
  120. p.port,
  121. p.numConns,
  122. p.keyspace,
  123. )
  124. }(host)
  125. }
  126. // add created pools
  127. for createCount > 0 {
  128. pool := <-pools
  129. createCount--
  130. if pool.Size() > 0 {
  131. // add pool only if there a connections available
  132. p.hostConnPools[string(pool.host.Peer())] = pool
  133. }
  134. }
  135. for addr := range toRemove {
  136. pool := p.hostConnPools[addr]
  137. delete(p.hostConnPools, addr)
  138. go pool.Close()
  139. }
  140. }
  141. func (p *policyConnPool) Size() int {
  142. p.mu.RLock()
  143. count := 0
  144. for _, pool := range p.hostConnPools {
  145. count += pool.Size()
  146. }
  147. p.mu.RUnlock()
  148. return count
  149. }
  150. func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
  151. ip := host.Peer().String()
  152. p.mu.RLock()
  153. pool, ok = p.hostConnPools[ip]
  154. p.mu.RUnlock()
  155. return
  156. }
  157. func (p *policyConnPool) Close() {
  158. p.mu.Lock()
  159. defer p.mu.Unlock()
  160. // close the pools
  161. for addr, pool := range p.hostConnPools {
  162. delete(p.hostConnPools, addr)
  163. pool.Close()
  164. }
  165. }
  166. func (p *policyConnPool) addHost(host *HostInfo) {
  167. ip := host.Peer().String()
  168. p.mu.Lock()
  169. pool, ok := p.hostConnPools[ip]
  170. if !ok {
  171. pool = newHostConnPool(
  172. p.session,
  173. host,
  174. host.Port(), // TODO: if port == 0 use pool.port?
  175. p.numConns,
  176. p.keyspace,
  177. )
  178. p.hostConnPools[ip] = pool
  179. }
  180. p.mu.Unlock()
  181. pool.fill()
  182. }
  183. func (p *policyConnPool) removeHost(ip net.IP) {
  184. k := ip.String()
  185. p.mu.Lock()
  186. pool, ok := p.hostConnPools[k]
  187. if !ok {
  188. p.mu.Unlock()
  189. return
  190. }
  191. delete(p.hostConnPools, k)
  192. p.mu.Unlock()
  193. go pool.Close()
  194. }
  195. func (p *policyConnPool) hostUp(host *HostInfo) {
  196. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  197. // detect down hosts, then try to reconnect to them.
  198. p.addHost(host)
  199. }
  200. func (p *policyConnPool) hostDown(ip net.IP) {
  201. // TODO(zariel): mark host as down so we can try to connect to it later, for
  202. // now just treat it has removed.
  203. p.removeHost(ip)
  204. }
  205. // hostConnPool is a connection pool for a single host.
  206. // Connection selection is based on a provided ConnSelectionPolicy
  207. type hostConnPool struct {
  208. session *Session
  209. host *HostInfo
  210. port int
  211. addr string
  212. size int
  213. keyspace string
  214. // protection for conns, closed, filling
  215. mu sync.RWMutex
  216. conns []*Conn
  217. closed bool
  218. filling bool
  219. pos uint32
  220. }
  221. func (h *hostConnPool) String() string {
  222. h.mu.RLock()
  223. defer h.mu.RUnlock()
  224. return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
  225. h.filling, h.closed, len(h.conns), h.size, h.host)
  226. }
  227. func newHostConnPool(session *Session, host *HostInfo, port, size int,
  228. keyspace string) *hostConnPool {
  229. pool := &hostConnPool{
  230. session: session,
  231. host: host,
  232. port: port,
  233. addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
  234. size: size,
  235. keyspace: keyspace,
  236. conns: make([]*Conn, 0, size),
  237. filling: false,
  238. closed: false,
  239. }
  240. // the pool is not filled or connected
  241. return pool
  242. }
  243. // Pick a connection from this connection pool for the given query.
  244. func (pool *hostConnPool) Pick() *Conn {
  245. pool.mu.RLock()
  246. defer pool.mu.RUnlock()
  247. if pool.closed {
  248. return nil
  249. }
  250. size := len(pool.conns)
  251. if size < pool.size {
  252. // try to fill the pool
  253. go pool.fill()
  254. if size == 0 {
  255. return nil
  256. }
  257. }
  258. pos := int(atomic.AddUint32(&pool.pos, 1) - 1)
  259. var (
  260. leastBusyConn *Conn
  261. streamsAvailable int
  262. )
  263. // find the conn which has the most available streams, this is racy
  264. for i := 0; i < size; i++ {
  265. conn := pool.conns[(pos+i)%size]
  266. if streams := conn.AvailableStreams(); streams > streamsAvailable {
  267. leastBusyConn = conn
  268. streamsAvailable = streams
  269. }
  270. }
  271. return leastBusyConn
  272. }
  273. //Size returns the number of connections currently active in the pool
  274. func (pool *hostConnPool) Size() int {
  275. pool.mu.RLock()
  276. defer pool.mu.RUnlock()
  277. return len(pool.conns)
  278. }
  279. //Close the connection pool
  280. func (pool *hostConnPool) Close() {
  281. pool.mu.Lock()
  282. defer pool.mu.Unlock()
  283. if pool.closed {
  284. return
  285. }
  286. pool.closed = true
  287. pool.drainLocked()
  288. }
  289. // Fill the connection pool
  290. func (pool *hostConnPool) fill() {
  291. pool.mu.RLock()
  292. // avoid filling a closed pool, or concurrent filling
  293. if pool.closed || pool.filling {
  294. pool.mu.RUnlock()
  295. return
  296. }
  297. // determine the filling work to be done
  298. startCount := len(pool.conns)
  299. fillCount := pool.size - startCount
  300. // avoid filling a full (or overfull) pool
  301. if fillCount <= 0 {
  302. pool.mu.RUnlock()
  303. return
  304. }
  305. // switch from read to write lock
  306. pool.mu.RUnlock()
  307. pool.mu.Lock()
  308. // double check everything since the lock was released
  309. startCount = len(pool.conns)
  310. fillCount = pool.size - startCount
  311. if pool.closed || pool.filling || fillCount <= 0 {
  312. // looks like another goroutine already beat this
  313. // goroutine to the filling
  314. pool.mu.Unlock()
  315. return
  316. }
  317. // ok fill the pool
  318. pool.filling = true
  319. // allow others to access the pool while filling
  320. pool.mu.Unlock()
  321. // only this goroutine should make calls to fill/empty the pool at this
  322. // point until after this routine or its subordinates calls
  323. // fillingStopped
  324. // fill only the first connection synchronously
  325. if startCount == 0 {
  326. err := pool.connect()
  327. pool.logConnectErr(err)
  328. if err != nil {
  329. // probably unreachable host
  330. pool.fillingStopped(true)
  331. // this is call with the connection pool mutex held, this call will
  332. // then recursively try to lock it again. FIXME
  333. go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
  334. return
  335. }
  336. // filled one
  337. fillCount--
  338. }
  339. // fill the rest of the pool asynchronously
  340. go func() {
  341. err := pool.connectMany(fillCount)
  342. // mark the end of filling
  343. pool.fillingStopped(err != nil)
  344. }()
  345. }
  346. func (pool *hostConnPool) logConnectErr(err error) {
  347. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  348. // connection refused
  349. // these are typical during a node outage so avoid log spam.
  350. if gocqlDebug {
  351. Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
  352. }
  353. } else if err != nil {
  354. // unexpected error
  355. Logger.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  356. }
  357. }
  358. // transition back to a not-filling state.
  359. func (pool *hostConnPool) fillingStopped(hadError bool) {
  360. if hadError {
  361. // wait for some time to avoid back-to-back filling
  362. // this provides some time between failed attempts
  363. // to fill the pool for the host to recover
  364. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  365. }
  366. pool.mu.Lock()
  367. pool.filling = false
  368. pool.mu.Unlock()
  369. }
  370. // connectMany creates new connections concurrent.
  371. func (pool *hostConnPool) connectMany(count int) error {
  372. if count == 0 {
  373. return nil
  374. }
  375. var (
  376. wg sync.WaitGroup
  377. mu sync.Mutex
  378. connectErr error
  379. )
  380. wg.Add(count)
  381. for i := 0; i < count; i++ {
  382. go func() {
  383. defer wg.Done()
  384. err := pool.connect()
  385. pool.logConnectErr(err)
  386. if err != nil {
  387. mu.Lock()
  388. connectErr = err
  389. mu.Unlock()
  390. }
  391. }()
  392. }
  393. // wait for all connections are done
  394. wg.Wait()
  395. return connectErr
  396. }
  397. // create a new connection to the host and add it to the pool
  398. func (pool *hostConnPool) connect() (err error) {
  399. // TODO: provide a more robust connection retry mechanism, we should also
  400. // be able to detect hosts that come up by trying to connect to downed ones.
  401. const maxAttempts = 3
  402. // try to connect
  403. var conn *Conn
  404. for i := 0; i < maxAttempts; i++ {
  405. conn, err = pool.session.connect(pool.host, pool)
  406. if err == nil {
  407. break
  408. }
  409. if opErr, isOpErr := err.(*net.OpError); isOpErr {
  410. // if the error is not a temporary error (ex: network unreachable) don't
  411. // retry
  412. if !opErr.Temporary() {
  413. break
  414. }
  415. }
  416. }
  417. if err != nil {
  418. return err
  419. }
  420. if pool.keyspace != "" {
  421. // set the keyspace
  422. if err = conn.UseKeyspace(pool.keyspace); err != nil {
  423. conn.Close()
  424. return err
  425. }
  426. }
  427. // add the Conn to the pool
  428. pool.mu.Lock()
  429. defer pool.mu.Unlock()
  430. if pool.closed {
  431. conn.Close()
  432. return nil
  433. }
  434. pool.conns = append(pool.conns, conn)
  435. return nil
  436. }
  437. // handle any error from a Conn
  438. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  439. if !closed {
  440. // still an open connection, so continue using it
  441. return
  442. }
  443. // TODO: track the number of errors per host and detect when a host is dead,
  444. // then also have something which can detect when a host comes back.
  445. pool.mu.Lock()
  446. defer pool.mu.Unlock()
  447. if pool.closed {
  448. // pool closed
  449. return
  450. }
  451. // find the connection index
  452. for i, candidate := range pool.conns {
  453. if candidate == conn {
  454. // remove the connection, not preserving order
  455. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  456. // lost a connection, so fill the pool
  457. go pool.fill()
  458. break
  459. }
  460. }
  461. }
  462. func (pool *hostConnPool) drainLocked() {
  463. // empty the pool
  464. conns := pool.conns
  465. pool.conns = nil
  466. // close the connections
  467. for _, conn := range conns {
  468. conn.Close()
  469. }
  470. }
  471. // removes and closes all connections from the pool
  472. func (pool *hostConnPool) drain() {
  473. pool.mu.Lock()
  474. defer pool.mu.Unlock()
  475. pool.drainLocked()
  476. }