connectionpool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []*HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. // ca cert is optional
  27. if sslOpts.CaPath != "" {
  28. if sslOpts.RootCAs == nil {
  29. sslOpts.RootCAs = x509.NewCertPool()
  30. }
  31. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  32. if err != nil {
  33. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  34. }
  35. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  36. return nil, errors.New("connectionpool: failed parsing or CA certs")
  37. }
  38. }
  39. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  40. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  41. if err != nil {
  42. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  43. }
  44. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  45. }
  46. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  47. return &sslOpts.Config, nil
  48. }
  49. type policyConnPool struct {
  50. session *Session
  51. port int
  52. numConns int
  53. connCfg *ConnConfig
  54. keyspace string
  55. mu sync.RWMutex
  56. hostPolicy HostSelectionPolicy
  57. connPolicy func() ConnSelectionPolicy
  58. hostConnPools map[string]*hostConnPool
  59. endpoints []string
  60. }
  61. func connConfig(session *Session) (*ConnConfig, error) {
  62. cfg := session.cfg
  63. var (
  64. err error
  65. tlsConfig *tls.Config
  66. )
  67. // TODO(zariel): move tls config setup into session init.
  68. if cfg.SslOpts != nil {
  69. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  70. if err != nil {
  71. return nil, err
  72. }
  73. }
  74. return &ConnConfig{
  75. ProtoVersion: cfg.ProtoVersion,
  76. CQLVersion: cfg.CQLVersion,
  77. Timeout: cfg.Timeout,
  78. Compressor: cfg.Compressor,
  79. Authenticator: cfg.Authenticator,
  80. Keepalive: cfg.SocketKeepalive,
  81. tlsConfig: tlsConfig,
  82. }, nil
  83. }
  84. func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
  85. connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
  86. connCfg, err := connConfig(session)
  87. if err != nil {
  88. return nil, err
  89. }
  90. // create the pool
  91. pool := &policyConnPool{
  92. session: session,
  93. port: session.cfg.Port,
  94. numConns: session.cfg.NumConns,
  95. connCfg: connCfg,
  96. keyspace: session.cfg.Keyspace,
  97. hostPolicy: hostPolicy,
  98. connPolicy: connPolicy,
  99. hostConnPools: map[string]*hostConnPool{},
  100. }
  101. pool.endpoints = make([]string, len(session.cfg.Hosts))
  102. copy(pool.endpoints, session.cfg.Hosts)
  103. return pool, nil
  104. }
  105. func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
  106. p.mu.Lock()
  107. defer p.mu.Unlock()
  108. toRemove := make(map[string]struct{})
  109. for addr := range p.hostConnPools {
  110. toRemove[addr] = struct{}{}
  111. }
  112. // TODO connect to hosts in parallel, but wait for pools to be
  113. // created before returning
  114. for _, host := range hosts {
  115. pool, exists := p.hostConnPools[host.Peer()]
  116. if !exists {
  117. // create a connection pool for the host
  118. pool = newHostConnPool(
  119. p.session,
  120. host.Peer(),
  121. p.port,
  122. p.numConns,
  123. p.connCfg,
  124. p.keyspace,
  125. p.connPolicy(),
  126. )
  127. p.hostConnPools[host.Peer()] = pool
  128. } else {
  129. // still have this host, so don't remove it
  130. delete(toRemove, host.Peer())
  131. }
  132. }
  133. for addr := range toRemove {
  134. pool := p.hostConnPools[addr]
  135. delete(p.hostConnPools, addr)
  136. pool.Close()
  137. }
  138. // update the policy
  139. p.hostPolicy.SetHosts(hosts)
  140. }
  141. func (p *policyConnPool) SetPartitioner(partitioner string) {
  142. p.hostPolicy.SetPartitioner(partitioner)
  143. }
  144. func (p *policyConnPool) Size() int {
  145. p.mu.RLock()
  146. count := 0
  147. for _, pool := range p.hostConnPools {
  148. count += pool.Size()
  149. }
  150. p.mu.RUnlock()
  151. return count
  152. }
  153. func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
  154. nextHost := p.hostPolicy.Pick(qry)
  155. var (
  156. host SelectedHost
  157. conn *Conn
  158. )
  159. p.mu.RLock()
  160. defer p.mu.RUnlock()
  161. for conn == nil {
  162. host = nextHost()
  163. if host == nil {
  164. break
  165. } else if host.Info() == nil {
  166. panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
  167. }
  168. pool, ok := p.hostConnPools[host.Info().Peer()]
  169. if !ok {
  170. continue
  171. }
  172. conn = pool.Pick(qry)
  173. }
  174. return host, conn
  175. }
  176. func (p *policyConnPool) Close() {
  177. p.mu.Lock()
  178. defer p.mu.Unlock()
  179. // remove the hosts from the policy
  180. p.hostPolicy.SetHosts(nil)
  181. // close the pools
  182. for addr, pool := range p.hostConnPools {
  183. delete(p.hostConnPools, addr)
  184. pool.Close()
  185. }
  186. }
  187. func (p *policyConnPool) addHost(host *HostInfo) {
  188. p.mu.Lock()
  189. defer p.mu.Unlock()
  190. pool, ok := p.hostConnPools[host.Peer()]
  191. if ok {
  192. return
  193. }
  194. pool = newHostConnPool(
  195. p.session,
  196. host.Peer(),
  197. p.port,
  198. p.numConns,
  199. p.connCfg,
  200. p.keyspace,
  201. p.connPolicy(),
  202. )
  203. p.hostConnPools[host.Peer()] = pool
  204. }
  205. func (p *policyConnPool) removeHost(addr string) {
  206. p.hostPolicy.RemoveHost(addr)
  207. p.mu.Lock()
  208. pool, ok := p.hostConnPools[addr]
  209. if !ok {
  210. p.mu.Unlock()
  211. return
  212. }
  213. delete(p.hostConnPools, addr)
  214. p.mu.Unlock()
  215. pool.Close()
  216. }
  217. func (p *policyConnPool) hostUp(host *HostInfo) {
  218. // TODO(zariel): have a set of up hosts and down hosts, we can internally
  219. // detect down hosts, then try to reconnect to them.
  220. p.addHost(host)
  221. }
  222. func (p *policyConnPool) hostDown(addr string) {
  223. // TODO(zariel): mark host as down so we can try to connect to it later, for
  224. // now just treat it has removed.
  225. p.removeHost(addr)
  226. }
  227. // hostConnPool is a connection pool for a single host.
  228. // Connection selection is based on a provided ConnSelectionPolicy
  229. type hostConnPool struct {
  230. session *Session
  231. host string
  232. port int
  233. addr string
  234. size int
  235. connCfg *ConnConfig
  236. keyspace string
  237. policy ConnSelectionPolicy
  238. // protection for conns, closed, filling
  239. mu sync.RWMutex
  240. conns []*Conn
  241. closed bool
  242. filling bool
  243. }
  244. func newHostConnPool(session *Session, host string, port, size int, connCfg *ConnConfig,
  245. keyspace string, policy ConnSelectionPolicy) *hostConnPool {
  246. pool := &hostConnPool{
  247. session: session,
  248. host: host,
  249. port: port,
  250. addr: JoinHostPort(host, port),
  251. size: size,
  252. connCfg: connCfg,
  253. keyspace: keyspace,
  254. policy: policy,
  255. conns: make([]*Conn, 0, size),
  256. filling: false,
  257. closed: false,
  258. }
  259. // fill the pool with the initial connections before returning
  260. pool.fill()
  261. return pool
  262. }
  263. // Pick a connection from this connection pool for the given query.
  264. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  265. pool.mu.RLock()
  266. if pool.closed {
  267. pool.mu.RUnlock()
  268. return nil
  269. }
  270. empty := len(pool.conns) == 0
  271. pool.mu.RUnlock()
  272. if empty {
  273. // try to fill the empty pool
  274. go pool.fill()
  275. return nil
  276. }
  277. return pool.policy.Pick(qry)
  278. }
  279. //Size returns the number of connections currently active in the pool
  280. func (pool *hostConnPool) Size() int {
  281. pool.mu.RLock()
  282. defer pool.mu.RUnlock()
  283. return len(pool.conns)
  284. }
  285. //Close the connection pool
  286. func (pool *hostConnPool) Close() {
  287. pool.mu.Lock()
  288. defer pool.mu.Unlock()
  289. if pool.closed {
  290. return
  291. }
  292. pool.closed = true
  293. // drain, but don't wait
  294. go pool.drain()
  295. }
  296. // Fill the connection pool
  297. func (pool *hostConnPool) fill() {
  298. pool.mu.RLock()
  299. // avoid filling a closed pool, or concurrent filling
  300. if pool.closed || pool.filling {
  301. pool.mu.RUnlock()
  302. return
  303. }
  304. // determine the filling work to be done
  305. startCount := len(pool.conns)
  306. fillCount := pool.size - startCount
  307. // avoid filling a full (or overfull) pool
  308. if fillCount <= 0 {
  309. pool.mu.RUnlock()
  310. return
  311. }
  312. // switch from read to write lock
  313. pool.mu.RUnlock()
  314. pool.mu.Lock()
  315. // double check everything since the lock was released
  316. startCount = len(pool.conns)
  317. fillCount = pool.size - startCount
  318. if pool.closed || pool.filling || fillCount <= 0 {
  319. // looks like another goroutine already beat this
  320. // goroutine to the filling
  321. pool.mu.Unlock()
  322. return
  323. }
  324. // ok fill the pool
  325. pool.filling = true
  326. // allow others to access the pool while filling
  327. pool.mu.Unlock()
  328. // only this goroutine should make calls to fill/empty the pool at this
  329. // point until after this routine or its subordinates calls
  330. // fillingStopped
  331. // fill only the first connection synchronously
  332. if startCount == 0 {
  333. err := pool.connect()
  334. pool.logConnectErr(err)
  335. if err != nil {
  336. // probably unreachable host
  337. go pool.fillingStopped()
  338. return
  339. }
  340. // filled one
  341. fillCount--
  342. // connect all connections to this host in sync
  343. for fillCount > 0 {
  344. err := pool.connect()
  345. pool.logConnectErr(err)
  346. // decrement, even on error
  347. fillCount--
  348. }
  349. go pool.fillingStopped()
  350. return
  351. }
  352. // fill the rest of the pool asynchronously
  353. go func() {
  354. for fillCount > 0 {
  355. err := pool.connect()
  356. pool.logConnectErr(err)
  357. // decrement, even on error
  358. fillCount--
  359. }
  360. // mark the end of filling
  361. pool.fillingStopped()
  362. }()
  363. }
  364. func (pool *hostConnPool) logConnectErr(err error) {
  365. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  366. // connection refused
  367. // these are typical during a node outage so avoid log spam.
  368. } else if err != nil {
  369. // unexpected error
  370. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  371. }
  372. }
  373. // transition back to a not-filling state.
  374. func (pool *hostConnPool) fillingStopped() {
  375. // wait for some time to avoid back-to-back filling
  376. // this provides some time between failed attempts
  377. // to fill the pool for the host to recover
  378. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  379. pool.mu.Lock()
  380. pool.filling = false
  381. pool.mu.Unlock()
  382. }
  383. // create a new connection to the host and add it to the pool
  384. func (pool *hostConnPool) connect() error {
  385. // try to connect
  386. conn, err := Connect(pool.addr, pool.connCfg, pool, pool.session)
  387. if err != nil {
  388. return err
  389. }
  390. if pool.keyspace != "" {
  391. // set the keyspace
  392. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  393. conn.Close()
  394. return err
  395. }
  396. }
  397. // add the Conn to the pool
  398. pool.mu.Lock()
  399. defer pool.mu.Unlock()
  400. if pool.closed {
  401. conn.Close()
  402. return nil
  403. }
  404. pool.conns = append(pool.conns, conn)
  405. pool.policy.SetConns(pool.conns)
  406. return nil
  407. }
  408. // handle any error from a Conn
  409. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  410. if !closed {
  411. // still an open connection, so continue using it
  412. return
  413. }
  414. pool.mu.Lock()
  415. defer pool.mu.Unlock()
  416. if pool.closed {
  417. // pool closed
  418. return
  419. }
  420. // find the connection index
  421. for i, candidate := range pool.conns {
  422. if candidate == conn {
  423. // remove the connection, not preserving order
  424. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  425. // update the policy
  426. pool.policy.SetConns(pool.conns)
  427. // lost a connection, so fill the pool
  428. go pool.fill()
  429. break
  430. }
  431. }
  432. }
  433. // removes and closes all connections from the pool
  434. func (pool *hostConnPool) drain() {
  435. pool.mu.Lock()
  436. defer pool.mu.Unlock()
  437. // empty the pool
  438. conns := pool.conns
  439. pool.conns = pool.conns[:0:0]
  440. // update the policy
  441. pool.policy.SetConns(pool.conns)
  442. // close the connections
  443. for _, conn := range conns {
  444. conn.Close()
  445. }
  446. }