connectionpool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. // ca cert is optional
  27. if sslOpts.CaPath != "" {
  28. if sslOpts.RootCAs == nil {
  29. sslOpts.RootCAs = x509.NewCertPool()
  30. }
  31. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  32. if err != nil {
  33. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  34. }
  35. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  36. return nil, errors.New("connectionpool: failed parsing or CA certs")
  37. }
  38. }
  39. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  40. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  41. if err != nil {
  42. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  43. }
  44. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  45. }
  46. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  47. return &sslOpts.Config, nil
  48. }
  49. type policyConnPool struct {
  50. session *Session
  51. port int
  52. numConns int
  53. connCfg *ConnConfig
  54. keyspace string
  55. mu sync.RWMutex
  56. hostPolicy HostSelectionPolicy
  57. connPolicy func() ConnSelectionPolicy
  58. hostConnPools map[string]*hostConnPool
  59. }
  60. func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
  61. connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
  62. var (
  63. err error
  64. tlsConfig *tls.Config
  65. )
  66. cfg := session.cfg
  67. if cfg.SslOpts != nil {
  68. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  69. if err != nil {
  70. return nil, err
  71. }
  72. }
  73. // create the pool
  74. pool := &policyConnPool{
  75. session: session,
  76. port: cfg.Port,
  77. numConns: cfg.NumConns,
  78. connCfg: &ConnConfig{
  79. ProtoVersion: cfg.ProtoVersion,
  80. CQLVersion: cfg.CQLVersion,
  81. Timeout: cfg.Timeout,
  82. Compressor: cfg.Compressor,
  83. Authenticator: cfg.Authenticator,
  84. Keepalive: cfg.SocketKeepalive,
  85. tlsConfig: tlsConfig,
  86. },
  87. keyspace: cfg.Keyspace,
  88. hostPolicy: hostPolicy,
  89. connPolicy: connPolicy,
  90. hostConnPools: map[string]*hostConnPool{},
  91. }
  92. hosts := make([]HostInfo, len(cfg.Hosts))
  93. for i, hostAddr := range cfg.Hosts {
  94. hosts[i].Peer = hostAddr
  95. }
  96. pool.SetHosts(hosts)
  97. return pool, nil
  98. }
  99. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  100. p.mu.Lock()
  101. defer p.mu.Unlock()
  102. toRemove := make(map[string]struct{})
  103. for addr := range p.hostConnPools {
  104. toRemove[addr] = struct{}{}
  105. }
  106. // TODO connect to hosts in parallel, but wait for pools to be
  107. // created before returning
  108. for i := range hosts {
  109. pool, exists := p.hostConnPools[hosts[i].Peer]
  110. if !exists {
  111. // create a connection pool for the host
  112. pool = newHostConnPool(
  113. p.session,
  114. hosts[i].Peer,
  115. p.port,
  116. p.numConns,
  117. p.connCfg,
  118. p.keyspace,
  119. p.connPolicy(),
  120. )
  121. p.hostConnPools[hosts[i].Peer] = pool
  122. } else {
  123. // still have this host, so don't remove it
  124. delete(toRemove, hosts[i].Peer)
  125. }
  126. }
  127. for addr := range toRemove {
  128. pool := p.hostConnPools[addr]
  129. delete(p.hostConnPools, addr)
  130. pool.Close()
  131. }
  132. // update the policy
  133. p.hostPolicy.SetHosts(hosts)
  134. }
  135. func (p *policyConnPool) SetPartitioner(partitioner string) {
  136. p.hostPolicy.SetPartitioner(partitioner)
  137. }
  138. func (p *policyConnPool) Size() int {
  139. p.mu.RLock()
  140. count := 0
  141. for _, pool := range p.hostConnPools {
  142. count += pool.Size()
  143. }
  144. p.mu.RUnlock()
  145. return count
  146. }
  147. func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
  148. nextHost := p.hostPolicy.Pick(qry)
  149. var (
  150. host SelectedHost
  151. conn *Conn
  152. )
  153. p.mu.RLock()
  154. defer p.mu.RUnlock()
  155. for conn == nil {
  156. host = nextHost()
  157. if host == nil {
  158. break
  159. } else if host.Info() == nil {
  160. panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
  161. }
  162. pool, ok := p.hostConnPools[host.Info().Peer]
  163. if !ok {
  164. continue
  165. }
  166. conn = pool.Pick(qry)
  167. }
  168. return host, conn
  169. }
  170. func (p *policyConnPool) Close() {
  171. p.mu.Lock()
  172. defer p.mu.Unlock()
  173. // remove the hosts from the policy
  174. p.hostPolicy.SetHosts([]HostInfo{})
  175. // close the pools
  176. for addr, pool := range p.hostConnPools {
  177. delete(p.hostConnPools, addr)
  178. pool.Close()
  179. }
  180. }
  181. func (p *policyConnPool) addHost(host *HostInfo) {
  182. p.mu.Lock()
  183. defer p.mu.Unlock()
  184. pool, ok := p.hostConnPools[host.Peer]
  185. if ok {
  186. return
  187. }
  188. pool = newHostConnPool(
  189. p.session,
  190. host.Peer,
  191. p.port,
  192. p.numConns,
  193. p.connCfg,
  194. p.keyspace,
  195. p.connPolicy(),
  196. )
  197. p.hostConnPools[host.Peer] = pool
  198. }
  199. func (p *policyConnPool) removeHost(addr string) {
  200. p.mu.Lock()
  201. pool, ok := p.hostConnPools[addr]
  202. if !ok {
  203. p.mu.Unlock()
  204. return
  205. }
  206. delete(p.hostConnPools, addr)
  207. p.mu.Unlock()
  208. pool.Close()
  209. }
  210. // hostConnPool is a connection pool for a single host.
  211. // Connection selection is based on a provided ConnSelectionPolicy
  212. type hostConnPool struct {
  213. session *Session
  214. host string
  215. port int
  216. addr string
  217. size int
  218. connCfg *ConnConfig
  219. keyspace string
  220. policy ConnSelectionPolicy
  221. // protection for conns, closed, filling
  222. mu sync.RWMutex
  223. conns []*Conn
  224. closed bool
  225. filling bool
  226. }
  227. func newHostConnPool(session *Session, host string, port, size int, connCfg *ConnConfig,
  228. keyspace string, policy ConnSelectionPolicy) *hostConnPool {
  229. pool := &hostConnPool{
  230. session: session,
  231. host: host,
  232. port: port,
  233. addr: JoinHostPort(host, port),
  234. size: size,
  235. connCfg: connCfg,
  236. keyspace: keyspace,
  237. policy: policy,
  238. conns: make([]*Conn, 0, size),
  239. filling: false,
  240. closed: false,
  241. }
  242. // fill the pool with the initial connections before returning
  243. pool.fill()
  244. return pool
  245. }
  246. // Pick a connection from this connection pool for the given query.
  247. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  248. pool.mu.RLock()
  249. if pool.closed {
  250. pool.mu.RUnlock()
  251. return nil
  252. }
  253. empty := len(pool.conns) == 0
  254. pool.mu.RUnlock()
  255. if empty {
  256. // try to fill the empty pool
  257. go pool.fill()
  258. return nil
  259. }
  260. return pool.policy.Pick(qry)
  261. }
  262. //Size returns the number of connections currently active in the pool
  263. func (pool *hostConnPool) Size() int {
  264. pool.mu.RLock()
  265. defer pool.mu.RUnlock()
  266. return len(pool.conns)
  267. }
  268. //Close the connection pool
  269. func (pool *hostConnPool) Close() {
  270. pool.mu.Lock()
  271. defer pool.mu.Unlock()
  272. if pool.closed {
  273. return
  274. }
  275. pool.closed = true
  276. // drain, but don't wait
  277. go pool.drain()
  278. }
  279. // Fill the connection pool
  280. func (pool *hostConnPool) fill() {
  281. pool.mu.RLock()
  282. // avoid filling a closed pool, or concurrent filling
  283. if pool.closed || pool.filling {
  284. pool.mu.RUnlock()
  285. return
  286. }
  287. // determine the filling work to be done
  288. startCount := len(pool.conns)
  289. fillCount := pool.size - startCount
  290. // avoid filling a full (or overfull) pool
  291. if fillCount <= 0 {
  292. pool.mu.RUnlock()
  293. return
  294. }
  295. // switch from read to write lock
  296. pool.mu.RUnlock()
  297. pool.mu.Lock()
  298. // double check everything since the lock was released
  299. startCount = len(pool.conns)
  300. fillCount = pool.size - startCount
  301. if pool.closed || pool.filling || fillCount <= 0 {
  302. // looks like another goroutine already beat this
  303. // goroutine to the filling
  304. pool.mu.Unlock()
  305. return
  306. }
  307. // ok fill the pool
  308. pool.filling = true
  309. // allow others to access the pool while filling
  310. pool.mu.Unlock()
  311. // only this goroutine should make calls to fill/empty the pool at this
  312. // point until after this routine or its subordinates calls
  313. // fillingStopped
  314. // fill only the first connection synchronously
  315. if startCount == 0 {
  316. err := pool.connect()
  317. pool.logConnectErr(err)
  318. if err != nil {
  319. // probably unreachable host
  320. go pool.fillingStopped()
  321. return
  322. }
  323. // filled one
  324. fillCount--
  325. // connect all connections to this host in sync
  326. for fillCount > 0 {
  327. err := pool.connect()
  328. pool.logConnectErr(err)
  329. // decrement, even on error
  330. fillCount--
  331. }
  332. go pool.fillingStopped()
  333. return
  334. }
  335. // fill the rest of the pool asynchronously
  336. go func() {
  337. for fillCount > 0 {
  338. err := pool.connect()
  339. pool.logConnectErr(err)
  340. // decrement, even on error
  341. fillCount--
  342. }
  343. // mark the end of filling
  344. pool.fillingStopped()
  345. }()
  346. }
  347. func (pool *hostConnPool) logConnectErr(err error) {
  348. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  349. // connection refused
  350. // these are typical during a node outage so avoid log spam.
  351. } else if err != nil {
  352. // unexpected error
  353. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  354. }
  355. }
  356. // transition back to a not-filling state.
  357. func (pool *hostConnPool) fillingStopped() {
  358. // wait for some time to avoid back-to-back filling
  359. // this provides some time between failed attempts
  360. // to fill the pool for the host to recover
  361. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  362. pool.mu.Lock()
  363. pool.filling = false
  364. pool.mu.Unlock()
  365. }
  366. // create a new connection to the host and add it to the pool
  367. func (pool *hostConnPool) connect() error {
  368. // try to connect
  369. conn, err := Connect(pool.addr, pool.connCfg, pool, pool.session)
  370. if err != nil {
  371. return err
  372. }
  373. if pool.keyspace != "" {
  374. // set the keyspace
  375. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  376. conn.Close()
  377. return err
  378. }
  379. }
  380. // add the Conn to the pool
  381. pool.mu.Lock()
  382. defer pool.mu.Unlock()
  383. if pool.closed {
  384. conn.Close()
  385. return nil
  386. }
  387. pool.conns = append(pool.conns, conn)
  388. pool.policy.SetConns(pool.conns)
  389. return nil
  390. }
  391. // handle any error from a Conn
  392. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  393. if !closed {
  394. // still an open connection, so continue using it
  395. return
  396. }
  397. pool.mu.Lock()
  398. defer pool.mu.Unlock()
  399. if pool.closed {
  400. // pool closed
  401. return
  402. }
  403. // find the connection index
  404. for i, candidate := range pool.conns {
  405. if candidate == conn {
  406. // remove the connection, not preserving order
  407. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  408. // update the policy
  409. pool.policy.SetConns(pool.conns)
  410. // lost a connection, so fill the pool
  411. go pool.fill()
  412. break
  413. }
  414. }
  415. }
  416. // removes and closes all connections from the pool
  417. func (pool *hostConnPool) drain() {
  418. pool.mu.Lock()
  419. defer pool.mu.Unlock()
  420. // empty the pool
  421. conns := pool.conns
  422. pool.conns = pool.conns[:0]
  423. // update the policy
  424. pool.policy.SetConns(pool.conns)
  425. // close the connections
  426. for _, conn := range conns {
  427. conn.Close()
  428. }
  429. }