connectionpool.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []*HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. // ca cert is optional
  27. if sslOpts.CaPath != "" {
  28. if sslOpts.RootCAs == nil {
  29. sslOpts.RootCAs = x509.NewCertPool()
  30. }
  31. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  32. if err != nil {
  33. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  34. }
  35. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  36. return nil, errors.New("connectionpool: failed parsing or CA certs")
  37. }
  38. }
  39. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  40. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  41. if err != nil {
  42. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  43. }
  44. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  45. }
  46. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  47. return &sslOpts.Config, nil
  48. }
  49. type policyConnPool struct {
  50. session *Session
  51. port int
  52. numConns int
  53. keyspace string
  54. mu sync.RWMutex
  55. hostPolicy HostSelectionPolicy
  56. connPolicy func() ConnSelectionPolicy
  57. hostConnPools map[string]*hostConnPool
  58. endpoints []string
  59. }
  60. func connConfig(session *Session) (*ConnConfig, error) {
  61. cfg := session.cfg
  62. var (
  63. err error
  64. tlsConfig *tls.Config
  65. )
  66. // TODO(zariel): move tls config setup into session init.
  67. if cfg.SslOpts != nil {
  68. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  69. if err != nil {
  70. return nil, err
  71. }
  72. }
  73. return &ConnConfig{
  74. ProtoVersion: cfg.ProtoVersion,
  75. CQLVersion: cfg.CQLVersion,
  76. Timeout: cfg.Timeout,
  77. Compressor: cfg.Compressor,
  78. Authenticator: cfg.Authenticator,
  79. Keepalive: cfg.SocketKeepalive,
  80. tlsConfig: tlsConfig,
  81. }, nil
  82. }
  83. func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
  84. connPolicy func() ConnSelectionPolicy) *policyConnPool {
  85. // create the pool
  86. pool := &policyConnPool{
  87. session: session,
  88. port: session.cfg.Port,
  89. numConns: session.cfg.NumConns,
  90. keyspace: session.cfg.Keyspace,
  91. hostPolicy: hostPolicy,
  92. connPolicy: connPolicy,
  93. hostConnPools: map[string]*hostConnPool{},
  94. }
  95. pool.endpoints = make([]string, len(session.cfg.Hosts))
  96. copy(pool.endpoints, session.cfg.Hosts)
  97. return pool
  98. }
  99. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  100. p.mu.Lock()
  101. defer p.mu.Unlock()
  102. toRemove := make(map[string]struct{})
  103. for addr := range p.hostConnPools {
  104. toRemove[addr] = struct{}{}
  105. }
  106. pools := make(chan *hostConnPool)
  107. createCount := 0
  108. for _, host := range hosts {
  109. if !host.IsUp() {
  110. // don't create a connection pool for a down host
  111. continue
  112. }
  113. if _, exists := p.hostConnPools[host.Peer()]; exists {
  114. // still have this host, so don't remove it
  115. delete(toRemove, host.Peer())
  116. continue
  117. }
  118. createCount++
  119. go func(host *HostInfo) {
  120. // create a connection pool for the host
  121. pools <- newHostConnPool(
  122. p.session,
  123. host,
  124. p.port,
  125. p.numConns,
  126. p.keyspace,
  127. p.connPolicy(),
  128. )
  129. }(host)
  130. }
  131. // add created pools
  132. for createCount > 0 {
  133. pool := <-pools
  134. createCount--
  135. if pool.Size() > 0 {
  136. // add pool onyl if there a connections available
  137. p.hostConnPools[pool.host.Peer()] = pool
  138. }
  139. }
  140. for addr := range toRemove {
  141. pool := p.hostConnPools[addr]
  142. delete(p.hostConnPools, addr)
  143. go pool.Close()
  144. }
  145. // update the policy
  146. p.hostPolicy.SetHosts(hosts)
  147. }
  148. func (p *policyConnPool) SetPartitioner(partitioner string) {
  149. p.hostPolicy.SetPartitioner(partitioner)
  150. }
  151. func (p *policyConnPool) Size() int {
  152. p.mu.RLock()
  153. count := 0
  154. for _, pool := range p.hostConnPools {
  155. count += pool.Size()
  156. }
  157. p.mu.RUnlock()
  158. return count
  159. }
  160. func (p *policyConnPool) getPool(addr string) (pool *hostConnPool, ok bool) {
  161. p.mu.RLock()
  162. pool, ok = p.hostConnPools[addr]
  163. p.mu.RUnlock()
  164. return
  165. }
  166. func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
  167. nextHost := p.hostPolicy.Pick(qry)
  168. var (
  169. host SelectedHost
  170. conn *Conn
  171. )
  172. p.mu.RLock()
  173. defer p.mu.RUnlock()
  174. for conn == nil {
  175. host = nextHost()
  176. if host == nil {
  177. break
  178. } else if host.Info() == nil {
  179. panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
  180. }
  181. pool, ok := p.hostConnPools[host.Info().Peer()]
  182. if !ok {
  183. continue
  184. }
  185. conn = pool.Pick(qry)
  186. }
  187. return host, conn
  188. }
  189. func (p *policyConnPool) Close() {
  190. p.mu.Lock()
  191. defer p.mu.Unlock()
  192. // remove the hosts from the policy
  193. p.hostPolicy.SetHosts(nil)
  194. // close the pools
  195. for addr, pool := range p.hostConnPools {
  196. delete(p.hostConnPools, addr)
  197. pool.Close()
  198. }
  199. }
  200. func (p *policyConnPool) addHost(host *HostInfo) {
  201. p.mu.Lock()
  202. pool, ok := p.hostConnPools[host.Peer()]
  203. if !ok {
  204. pool = newHostConnPool(
  205. p.session,
  206. host,
  207. host.Port(), // TODO: if port == 0 use pool.port?
  208. p.numConns,
  209. p.keyspace,
  210. p.connPolicy(),
  211. )
  212. p.hostConnPools[host.Peer()] = pool
  213. }
  214. p.mu.Unlock()
  215. pool.fill()
  216. // update policy
  217. // TODO: policy should not have conns, it should have hosts and return a host
  218. // iter which the pool will use to serve conns
  219. p.hostPolicy.AddHost(host)
  220. }
  221. func (p *policyConnPool) removeHost(addr string) {
  222. p.hostPolicy.RemoveHost(addr)
  223. p.mu.Lock()
  224. pool, ok := p.hostConnPools[addr]
  225. if !ok {
  226. p.mu.Unlock()
  227. return
  228. }
  229. delete(p.hostConnPools, addr)
  230. p.mu.Unlock()
  231. go pool.Close()
  232. }
  233. func (p *policyConnPool) hostUp(host *HostInfo) {
  234. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  235. // detect down hosts, then try to reconnect to them.
  236. p.addHost(host)
  237. }
  238. func (p *policyConnPool) hostDown(addr string) {
  239. // TODO(zariel): mark host as down so we can try to connect to it later, for
  240. // now just treat it has removed.
  241. p.removeHost(addr)
  242. }
  243. // hostConnPool is a connection pool for a single host.
  244. // Connection selection is based on a provided ConnSelectionPolicy
  245. type hostConnPool struct {
  246. session *Session
  247. host *HostInfo
  248. port int
  249. addr string
  250. size int
  251. keyspace string
  252. policy ConnSelectionPolicy
  253. // protection for conns, closed, filling
  254. mu sync.RWMutex
  255. conns []*Conn
  256. closed bool
  257. filling bool
  258. }
  259. func (h *hostConnPool) String() string {
  260. h.mu.RLock()
  261. defer h.mu.RUnlock()
  262. return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
  263. h.filling, h.closed, len(h.conns), h.size, h.host)
  264. }
  265. func newHostConnPool(session *Session, host *HostInfo, port, size int,
  266. keyspace string, policy ConnSelectionPolicy) *hostConnPool {
  267. pool := &hostConnPool{
  268. session: session,
  269. host: host,
  270. port: port,
  271. addr: JoinHostPort(host.Peer(), port),
  272. size: size,
  273. keyspace: keyspace,
  274. policy: policy,
  275. conns: make([]*Conn, 0, size),
  276. filling: false,
  277. closed: false,
  278. }
  279. // the pool is not filled or connected
  280. return pool
  281. }
  282. // Pick a connection from this connection pool for the given query.
  283. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  284. pool.mu.RLock()
  285. if pool.closed {
  286. pool.mu.RUnlock()
  287. return nil
  288. }
  289. size := len(pool.conns)
  290. pool.mu.RUnlock()
  291. if size < pool.size {
  292. // try to fill the pool
  293. go pool.fill()
  294. if size == 0 {
  295. return nil
  296. }
  297. }
  298. return pool.policy.Pick(qry)
  299. }
  300. //Size returns the number of connections currently active in the pool
  301. func (pool *hostConnPool) Size() int {
  302. pool.mu.RLock()
  303. defer pool.mu.RUnlock()
  304. return len(pool.conns)
  305. }
  306. //Close the connection pool
  307. func (pool *hostConnPool) Close() {
  308. pool.mu.Lock()
  309. defer pool.mu.Unlock()
  310. if pool.closed {
  311. return
  312. }
  313. pool.closed = true
  314. pool.drainLocked()
  315. }
  316. // Fill the connection pool
  317. func (pool *hostConnPool) fill() {
  318. pool.mu.RLock()
  319. // avoid filling a closed pool, or concurrent filling
  320. if pool.closed || pool.filling {
  321. pool.mu.RUnlock()
  322. return
  323. }
  324. // determine the filling work to be done
  325. startCount := len(pool.conns)
  326. fillCount := pool.size - startCount
  327. // avoid filling a full (or overfull) pool
  328. if fillCount <= 0 {
  329. pool.mu.RUnlock()
  330. return
  331. }
  332. // switch from read to write lock
  333. pool.mu.RUnlock()
  334. pool.mu.Lock()
  335. // double check everything since the lock was released
  336. startCount = len(pool.conns)
  337. fillCount = pool.size - startCount
  338. if pool.closed || pool.filling || fillCount <= 0 {
  339. // looks like another goroutine already beat this
  340. // goroutine to the filling
  341. pool.mu.Unlock()
  342. return
  343. }
  344. // ok fill the pool
  345. pool.filling = true
  346. // allow others to access the pool while filling
  347. pool.mu.Unlock()
  348. // only this goroutine should make calls to fill/empty the pool at this
  349. // point until after this routine or its subordinates calls
  350. // fillingStopped
  351. // fill only the first connection synchronously
  352. if startCount == 0 {
  353. err := pool.connect()
  354. pool.logConnectErr(err)
  355. if err != nil {
  356. // probably unreachable host
  357. pool.fillingStopped(true)
  358. // this is calle with the connetion pool mutex held, this call will
  359. // then recursivly try to lock it again. FIXME
  360. go pool.session.handleNodeDown(net.ParseIP(pool.host.Peer()), pool.port)
  361. return
  362. }
  363. // filled one
  364. fillCount--
  365. }
  366. // fill the rest of the pool asynchronously
  367. go func() {
  368. err := pool.connectMany(fillCount)
  369. // mark the end of filling
  370. pool.fillingStopped(err != nil)
  371. }()
  372. }
  373. func (pool *hostConnPool) logConnectErr(err error) {
  374. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  375. // connection refused
  376. // these are typical during a node outage so avoid log spam.
  377. if gocqlDebug {
  378. log.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
  379. }
  380. } else if err != nil {
  381. // unexpected error
  382. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  383. }
  384. }
  385. // transition back to a not-filling state.
  386. func (pool *hostConnPool) fillingStopped(hadError bool) {
  387. if hadError {
  388. // wait for some time to avoid back-to-back filling
  389. // this provides some time between failed attempts
  390. // to fill the pool for the host to recover
  391. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  392. }
  393. pool.mu.Lock()
  394. pool.filling = false
  395. pool.mu.Unlock()
  396. }
  397. // connectMany creates new connections concurrent.
  398. func (pool *hostConnPool) connectMany(count int) error {
  399. if count == 0 {
  400. return nil
  401. }
  402. var (
  403. wg sync.WaitGroup
  404. mu sync.Mutex
  405. connectErr error
  406. )
  407. wg.Add(count)
  408. for i := 0; i < count; i++ {
  409. go func() {
  410. defer wg.Done()
  411. err := pool.connect()
  412. pool.logConnectErr(err)
  413. if err != nil {
  414. mu.Lock()
  415. connectErr = err
  416. mu.Unlock()
  417. }
  418. }()
  419. }
  420. // wait for all connections are done
  421. wg.Wait()
  422. return connectErr
  423. }
  424. // create a new connection to the host and add it to the pool
  425. func (pool *hostConnPool) connect() (err error) {
  426. // TODO: provide a more robust connection retry mechanism, we should also
  427. // be able to detect hosts that come up by trying to connect to downed ones.
  428. const maxAttempts = 3
  429. // try to connect
  430. var conn *Conn
  431. for i := 0; i < maxAttempts; i++ {
  432. conn, err = pool.session.connect(pool.addr, pool, pool.host)
  433. if err == nil {
  434. break
  435. }
  436. }
  437. if err != nil {
  438. return err
  439. }
  440. if pool.keyspace != "" {
  441. // set the keyspace
  442. if err = conn.UseKeyspace(pool.keyspace); err != nil {
  443. conn.Close()
  444. return err
  445. }
  446. }
  447. // add the Conn to the pool
  448. pool.mu.Lock()
  449. defer pool.mu.Unlock()
  450. if pool.closed {
  451. conn.Close()
  452. return nil
  453. }
  454. pool.conns = append(pool.conns, conn)
  455. conns := make([]*Conn, len(pool.conns))
  456. copy(conns, pool.conns)
  457. pool.policy.SetConns(conns)
  458. return nil
  459. }
  460. // handle any error from a Conn
  461. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  462. if !closed {
  463. // still an open connection, so continue using it
  464. return
  465. }
  466. // TODO: track the number of errors per host and detect when a host is dead,
  467. // then also have something which can detect when a host comes back.
  468. pool.mu.Lock()
  469. defer pool.mu.Unlock()
  470. if pool.closed {
  471. // pool closed
  472. return
  473. }
  474. // find the connection index
  475. for i, candidate := range pool.conns {
  476. if candidate == conn {
  477. // remove the connection, not preserving order
  478. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  479. // update the policy
  480. conns := make([]*Conn, len(pool.conns))
  481. copy(conns, pool.conns)
  482. pool.policy.SetConns(conns)
  483. // lost a connection, so fill the pool
  484. go pool.fill()
  485. break
  486. }
  487. }
  488. }
  489. func (pool *hostConnPool) drainLocked() {
  490. // empty the pool
  491. conns := pool.conns
  492. pool.conns = nil
  493. // update the policy
  494. pool.policy.SetConns(nil)
  495. // close the connections
  496. for _, conn := range conns {
  497. conn.Close()
  498. }
  499. }
  500. // removes and closes all connections from the pool
  501. func (pool *hostConnPool) drain() {
  502. pool.mu.Lock()
  503. defer pool.mu.Unlock()
  504. pool.drainLocked()
  505. }