connectionpool.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. // interface to implement to receive the host information
  19. type SetHosts interface {
  20. SetHosts(hosts []*HostInfo)
  21. }
  22. // interface to implement to receive the partitioner value
  23. type SetPartitioner interface {
  24. SetPartitioner(partitioner string)
  25. }
  26. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  27. // ca cert is optional
  28. if sslOpts.CaPath != "" {
  29. if sslOpts.RootCAs == nil {
  30. sslOpts.RootCAs = x509.NewCertPool()
  31. }
  32. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  33. if err != nil {
  34. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  35. }
  36. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  37. return nil, errors.New("connectionpool: failed parsing or CA certs")
  38. }
  39. }
  40. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  41. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  42. if err != nil {
  43. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  44. }
  45. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  46. }
  47. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  48. return &sslOpts.Config, nil
  49. }
  50. type policyConnPool struct {
  51. session *Session
  52. port int
  53. numConns int
  54. keyspace string
  55. mu sync.RWMutex
  56. hostConnPools map[string]*hostConnPool
  57. endpoints []string
  58. }
  59. func connConfig(session *Session) (*ConnConfig, error) {
  60. cfg := session.cfg
  61. var (
  62. err error
  63. tlsConfig *tls.Config
  64. )
  65. // TODO(zariel): move tls config setup into session init.
  66. if cfg.SslOpts != nil {
  67. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  68. if err != nil {
  69. return nil, err
  70. }
  71. }
  72. return &ConnConfig{
  73. ProtoVersion: cfg.ProtoVersion,
  74. CQLVersion: cfg.CQLVersion,
  75. Timeout: cfg.Timeout,
  76. Compressor: cfg.Compressor,
  77. Authenticator: cfg.Authenticator,
  78. Keepalive: cfg.SocketKeepalive,
  79. tlsConfig: tlsConfig,
  80. }, nil
  81. }
  82. func newPolicyConnPool(session *Session) *policyConnPool {
  83. // create the pool
  84. pool := &policyConnPool{
  85. session: session,
  86. port: session.cfg.Port,
  87. numConns: session.cfg.NumConns,
  88. keyspace: session.cfg.Keyspace,
  89. hostConnPools: map[string]*hostConnPool{},
  90. }
  91. pool.endpoints = make([]string, len(session.cfg.Hosts))
  92. copy(pool.endpoints, session.cfg.Hosts)
  93. return pool
  94. }
  95. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  96. p.mu.Lock()
  97. defer p.mu.Unlock()
  98. toRemove := make(map[string]struct{})
  99. for addr := range p.hostConnPools {
  100. toRemove[addr] = struct{}{}
  101. }
  102. pools := make(chan *hostConnPool)
  103. createCount := 0
  104. for _, host := range hosts {
  105. if !host.IsUp() {
  106. // don't create a connection pool for a down host
  107. continue
  108. }
  109. ip := host.Peer().String()
  110. if _, exists := p.hostConnPools[ip]; exists {
  111. // still have this host, so don't remove it
  112. delete(toRemove, ip)
  113. continue
  114. }
  115. createCount++
  116. go func(host *HostInfo) {
  117. // create a connection pool for the host
  118. pools <- newHostConnPool(
  119. p.session,
  120. host,
  121. p.port,
  122. p.numConns,
  123. p.keyspace,
  124. )
  125. }(host)
  126. }
  127. // add created pools
  128. for createCount > 0 {
  129. pool := <-pools
  130. createCount--
  131. if pool.Size() > 0 {
  132. // add pool onyl if there a connections available
  133. p.hostConnPools[string(pool.host.Peer())] = pool
  134. }
  135. }
  136. for addr := range toRemove {
  137. pool := p.hostConnPools[addr]
  138. delete(p.hostConnPools, addr)
  139. go pool.Close()
  140. }
  141. }
  142. func (p *policyConnPool) Size() int {
  143. p.mu.RLock()
  144. count := 0
  145. for _, pool := range p.hostConnPools {
  146. count += pool.Size()
  147. }
  148. p.mu.RUnlock()
  149. return count
  150. }
  151. func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
  152. ip := host.Peer().String()
  153. p.mu.RLock()
  154. pool, ok = p.hostConnPools[ip]
  155. p.mu.RUnlock()
  156. return
  157. }
  158. func (p *policyConnPool) Close() {
  159. p.mu.Lock()
  160. defer p.mu.Unlock()
  161. // close the pools
  162. for addr, pool := range p.hostConnPools {
  163. delete(p.hostConnPools, addr)
  164. pool.Close()
  165. }
  166. }
  167. func (p *policyConnPool) addHost(host *HostInfo) {
  168. ip := host.Peer().String()
  169. p.mu.Lock()
  170. pool, ok := p.hostConnPools[ip]
  171. if !ok {
  172. pool = newHostConnPool(
  173. p.session,
  174. host,
  175. host.Port(), // TODO: if port == 0 use pool.port?
  176. p.numConns,
  177. p.keyspace,
  178. )
  179. p.hostConnPools[ip] = pool
  180. }
  181. p.mu.Unlock()
  182. pool.fill()
  183. }
  184. func (p *policyConnPool) removeHost(ip net.IP) {
  185. k := ip.String()
  186. p.mu.Lock()
  187. pool, ok := p.hostConnPools[k]
  188. if !ok {
  189. p.mu.Unlock()
  190. return
  191. }
  192. delete(p.hostConnPools, k)
  193. p.mu.Unlock()
  194. go pool.Close()
  195. }
  196. func (p *policyConnPool) hostUp(host *HostInfo) {
  197. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  198. // detect down hosts, then try to reconnect to them.
  199. p.addHost(host)
  200. }
  201. func (p *policyConnPool) hostDown(ip net.IP) {
  202. // TODO(zariel): mark host as down so we can try to connect to it later, for
  203. // now just treat it has removed.
  204. p.removeHost(ip)
  205. }
  206. // hostConnPool is a connection pool for a single host.
  207. // Connection selection is based on a provided ConnSelectionPolicy
  208. type hostConnPool struct {
  209. session *Session
  210. host *HostInfo
  211. port int
  212. addr string
  213. size int
  214. keyspace string
  215. // protection for conns, closed, filling
  216. mu sync.RWMutex
  217. conns []*Conn
  218. closed bool
  219. filling bool
  220. pos uint32
  221. }
  222. func (h *hostConnPool) String() string {
  223. h.mu.RLock()
  224. defer h.mu.RUnlock()
  225. return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
  226. h.filling, h.closed, len(h.conns), h.size, h.host)
  227. }
  228. func newHostConnPool(session *Session, host *HostInfo, port, size int,
  229. keyspace string) *hostConnPool {
  230. pool := &hostConnPool{
  231. session: session,
  232. host: host,
  233. port: port,
  234. addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
  235. size: size,
  236. keyspace: keyspace,
  237. conns: make([]*Conn, 0, size),
  238. filling: false,
  239. closed: false,
  240. }
  241. // the pool is not filled or connected
  242. return pool
  243. }
  244. // Pick a connection from this connection pool for the given query.
  245. func (pool *hostConnPool) Pick() *Conn {
  246. pool.mu.RLock()
  247. defer pool.mu.RUnlock()
  248. if pool.closed {
  249. return nil
  250. }
  251. size := len(pool.conns)
  252. if size < pool.size {
  253. // try to fill the pool
  254. go pool.fill()
  255. if size == 0 {
  256. return nil
  257. }
  258. }
  259. pos := int(atomic.AddUint32(&pool.pos, 1) - 1)
  260. var (
  261. leastBusyConn *Conn
  262. streamsAvailable int
  263. )
  264. // find the conn which has the most available streams, this is racy
  265. for i := 0; i < size; i++ {
  266. conn := pool.conns[(pos+i)%size]
  267. if streams := conn.AvailableStreams(); streams > streamsAvailable {
  268. leastBusyConn = conn
  269. streamsAvailable = streams
  270. }
  271. }
  272. return leastBusyConn
  273. }
  274. //Size returns the number of connections currently active in the pool
  275. func (pool *hostConnPool) Size() int {
  276. pool.mu.RLock()
  277. defer pool.mu.RUnlock()
  278. return len(pool.conns)
  279. }
  280. //Close the connection pool
  281. func (pool *hostConnPool) Close() {
  282. pool.mu.Lock()
  283. defer pool.mu.Unlock()
  284. if pool.closed {
  285. return
  286. }
  287. pool.closed = true
  288. pool.drainLocked()
  289. }
  290. // Fill the connection pool
  291. func (pool *hostConnPool) fill() {
  292. pool.mu.RLock()
  293. // avoid filling a closed pool, or concurrent filling
  294. if pool.closed || pool.filling {
  295. pool.mu.RUnlock()
  296. return
  297. }
  298. // determine the filling work to be done
  299. startCount := len(pool.conns)
  300. fillCount := pool.size - startCount
  301. // avoid filling a full (or overfull) pool
  302. if fillCount <= 0 {
  303. pool.mu.RUnlock()
  304. return
  305. }
  306. // switch from read to write lock
  307. pool.mu.RUnlock()
  308. pool.mu.Lock()
  309. // double check everything since the lock was released
  310. startCount = len(pool.conns)
  311. fillCount = pool.size - startCount
  312. if pool.closed || pool.filling || fillCount <= 0 {
  313. // looks like another goroutine already beat this
  314. // goroutine to the filling
  315. pool.mu.Unlock()
  316. return
  317. }
  318. // ok fill the pool
  319. pool.filling = true
  320. // allow others to access the pool while filling
  321. pool.mu.Unlock()
  322. // only this goroutine should make calls to fill/empty the pool at this
  323. // point until after this routine or its subordinates calls
  324. // fillingStopped
  325. // fill only the first connection synchronously
  326. if startCount == 0 {
  327. err := pool.connect()
  328. pool.logConnectErr(err)
  329. if err != nil {
  330. // probably unreachable host
  331. pool.fillingStopped(true)
  332. // this is calle with the connetion pool mutex held, this call will
  333. // then recursivly try to lock it again. FIXME
  334. go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
  335. return
  336. }
  337. // filled one
  338. fillCount--
  339. }
  340. // fill the rest of the pool asynchronously
  341. go func() {
  342. err := pool.connectMany(fillCount)
  343. // mark the end of filling
  344. pool.fillingStopped(err != nil)
  345. }()
  346. }
  347. func (pool *hostConnPool) logConnectErr(err error) {
  348. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  349. // connection refused
  350. // these are typical during a node outage so avoid log spam.
  351. if gocqlDebug {
  352. log.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
  353. }
  354. } else if err != nil {
  355. // unexpected error
  356. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  357. }
  358. }
  359. // transition back to a not-filling state.
  360. func (pool *hostConnPool) fillingStopped(hadError bool) {
  361. if hadError {
  362. // wait for some time to avoid back-to-back filling
  363. // this provides some time between failed attempts
  364. // to fill the pool for the host to recover
  365. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  366. }
  367. pool.mu.Lock()
  368. pool.filling = false
  369. pool.mu.Unlock()
  370. }
  371. // connectMany creates new connections concurrent.
  372. func (pool *hostConnPool) connectMany(count int) error {
  373. if count == 0 {
  374. return nil
  375. }
  376. var (
  377. wg sync.WaitGroup
  378. mu sync.Mutex
  379. connectErr error
  380. )
  381. wg.Add(count)
  382. for i := 0; i < count; i++ {
  383. go func() {
  384. defer wg.Done()
  385. err := pool.connect()
  386. pool.logConnectErr(err)
  387. if err != nil {
  388. mu.Lock()
  389. connectErr = err
  390. mu.Unlock()
  391. }
  392. }()
  393. }
  394. // wait for all connections are done
  395. wg.Wait()
  396. return connectErr
  397. }
  398. // create a new connection to the host and add it to the pool
  399. func (pool *hostConnPool) connect() (err error) {
  400. // TODO: provide a more robust connection retry mechanism, we should also
  401. // be able to detect hosts that come up by trying to connect to downed ones.
  402. const maxAttempts = 3
  403. // try to connect
  404. var conn *Conn
  405. for i := 0; i < maxAttempts; i++ {
  406. conn, err = pool.session.connect(pool.host, pool)
  407. if err == nil {
  408. break
  409. }
  410. if opErr, isOpErr := err.(*net.OpError); isOpErr {
  411. // if the error is not a temporary error (ex: network unreachable) don't
  412. // retry
  413. if !opErr.Temporary() {
  414. break
  415. }
  416. }
  417. }
  418. if err != nil {
  419. return err
  420. }
  421. if pool.keyspace != "" {
  422. // set the keyspace
  423. if err = conn.UseKeyspace(pool.keyspace); err != nil {
  424. conn.Close()
  425. return err
  426. }
  427. }
  428. // add the Conn to the pool
  429. pool.mu.Lock()
  430. defer pool.mu.Unlock()
  431. if pool.closed {
  432. conn.Close()
  433. return nil
  434. }
  435. pool.conns = append(pool.conns, conn)
  436. return nil
  437. }
  438. // handle any error from a Conn
  439. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  440. if !closed {
  441. // still an open connection, so continue using it
  442. return
  443. }
  444. // TODO: track the number of errors per host and detect when a host is dead,
  445. // then also have something which can detect when a host comes back.
  446. pool.mu.Lock()
  447. defer pool.mu.Unlock()
  448. if pool.closed {
  449. // pool closed
  450. return
  451. }
  452. // find the connection index
  453. for i, candidate := range pool.conns {
  454. if candidate == conn {
  455. // remove the connection, not preserving order
  456. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  457. // lost a connection, so fill the pool
  458. go pool.fill()
  459. break
  460. }
  461. }
  462. }
  463. func (pool *hostConnPool) drainLocked() {
  464. // empty the pool
  465. conns := pool.conns
  466. pool.conns = nil
  467. // close the connections
  468. for _, conn := range conns {
  469. conn.Close()
  470. }
  471. }
  472. // removes and closes all connections from the pool
  473. func (pool *hostConnPool) drain() {
  474. pool.mu.Lock()
  475. defer pool.mu.Unlock()
  476. pool.drainLocked()
  477. }