connectionpool.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "crypto/tls"
  7. "crypto/x509"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "sync"
  15. "time"
  16. )
  17. // interface to implement to receive the host information
  18. type SetHosts interface {
  19. SetHosts(hosts []HostInfo)
  20. }
  21. // interface to implement to receive the partitioner value
  22. type SetPartitioner interface {
  23. SetPartitioner(partitioner string)
  24. }
  25. func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
  26. // ca cert is optional
  27. if sslOpts.CaPath != "" {
  28. if sslOpts.RootCAs == nil {
  29. sslOpts.RootCAs = x509.NewCertPool()
  30. }
  31. pem, err := ioutil.ReadFile(sslOpts.CaPath)
  32. if err != nil {
  33. return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
  34. }
  35. if !sslOpts.RootCAs.AppendCertsFromPEM(pem) {
  36. return nil, errors.New("connectionpool: failed parsing or CA certs")
  37. }
  38. }
  39. if sslOpts.CertPath != "" || sslOpts.KeyPath != "" {
  40. mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
  41. if err != nil {
  42. return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
  43. }
  44. sslOpts.Certificates = append(sslOpts.Certificates, mycert)
  45. }
  46. sslOpts.InsecureSkipVerify = !sslOpts.EnableHostVerification
  47. return &sslOpts.Config, nil
  48. }
  49. type policyConnPool struct {
  50. port int
  51. numConns int
  52. connCfg *ConnConfig
  53. keyspace string
  54. mu sync.RWMutex
  55. hostPolicy HostSelectionPolicy
  56. connPolicy func() ConnSelectionPolicy
  57. hostConnPools map[string]*hostConnPool
  58. }
  59. func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
  60. connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
  61. var (
  62. err error
  63. tlsConfig *tls.Config
  64. )
  65. if cfg.SslOpts != nil {
  66. tlsConfig, err = setupTLSConfig(cfg.SslOpts)
  67. if err != nil {
  68. return nil, err
  69. }
  70. }
  71. // create the pool
  72. pool := &policyConnPool{
  73. port: cfg.Port,
  74. numConns: cfg.NumConns,
  75. connCfg: &ConnConfig{
  76. ProtoVersion: cfg.ProtoVersion,
  77. CQLVersion: cfg.CQLVersion,
  78. Timeout: cfg.Timeout,
  79. NumStreams: cfg.NumStreams,
  80. Compressor: cfg.Compressor,
  81. Authenticator: cfg.Authenticator,
  82. Keepalive: cfg.SocketKeepalive,
  83. tlsConfig: tlsConfig,
  84. },
  85. keyspace: cfg.Keyspace,
  86. hostPolicy: hostPolicy,
  87. connPolicy: connPolicy,
  88. hostConnPools: map[string]*hostConnPool{},
  89. }
  90. hosts := make([]HostInfo, len(cfg.Hosts))
  91. for i, hostAddr := range cfg.Hosts {
  92. hosts[i].Peer = hostAddr
  93. }
  94. pool.SetHosts(hosts)
  95. return pool, nil
  96. }
  97. func (p *policyConnPool) SetHosts(hosts []HostInfo) {
  98. p.mu.Lock()
  99. toRemove := make(map[string]struct{})
  100. for addr := range p.hostConnPools {
  101. toRemove[addr] = struct{}{}
  102. }
  103. // TODO connect to hosts in parallel, but wait for pools to be
  104. // created before returning
  105. for i := range hosts {
  106. pool, exists := p.hostConnPools[hosts[i].Peer]
  107. if !exists {
  108. // create a connection pool for the host
  109. pool = newHostConnPool(
  110. hosts[i].Peer,
  111. p.port,
  112. p.numConns,
  113. p.connCfg,
  114. p.keyspace,
  115. p.connPolicy(),
  116. )
  117. p.hostConnPools[hosts[i].Peer] = pool
  118. } else {
  119. // still have this host, so don't remove it
  120. delete(toRemove, hosts[i].Peer)
  121. }
  122. }
  123. for addr := range toRemove {
  124. pool := p.hostConnPools[addr]
  125. delete(p.hostConnPools, addr)
  126. pool.Close()
  127. }
  128. // update the policy
  129. p.hostPolicy.SetHosts(hosts)
  130. p.mu.Unlock()
  131. }
  132. func (p *policyConnPool) SetPartitioner(partitioner string) {
  133. p.hostPolicy.SetPartitioner(partitioner)
  134. }
  135. func (p *policyConnPool) Size() int {
  136. p.mu.RLock()
  137. count := 0
  138. for _, pool := range p.hostConnPools {
  139. count += pool.Size()
  140. }
  141. p.mu.RUnlock()
  142. return count
  143. }
  144. func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
  145. nextHost := p.hostPolicy.Pick(qry)
  146. var (
  147. host SelectedHost
  148. conn *Conn
  149. )
  150. p.mu.RLock()
  151. for conn == nil {
  152. host = nextHost()
  153. if host == nil {
  154. break
  155. }
  156. conn = p.hostConnPools[host.Info().Peer].Pick(qry)
  157. }
  158. p.mu.RUnlock()
  159. return host, conn
  160. }
  161. func (p *policyConnPool) Close() {
  162. p.mu.Lock()
  163. // remove the hosts from the policy
  164. p.hostPolicy.SetHosts([]HostInfo{})
  165. // close the pools
  166. for addr, pool := range p.hostConnPools {
  167. delete(p.hostConnPools, addr)
  168. pool.Close()
  169. }
  170. p.mu.Unlock()
  171. }
  172. // hostConnPool is a connection pool for a single host.
  173. // Connection selection is based on a provided ConnSelectionPolicy
  174. type hostConnPool struct {
  175. host string
  176. port int
  177. addr string
  178. size int
  179. connCfg *ConnConfig
  180. keyspace string
  181. policy ConnSelectionPolicy
  182. // protection for conns, closed, filling
  183. mu sync.RWMutex
  184. conns []*Conn
  185. closed bool
  186. filling bool
  187. }
  188. func newHostConnPool(host string, port int, size int, connCfg *ConnConfig,
  189. keyspace string, policy ConnSelectionPolicy) *hostConnPool {
  190. pool := &hostConnPool{
  191. host: host,
  192. port: port,
  193. addr: JoinHostPort(host, port),
  194. size: size,
  195. connCfg: connCfg,
  196. keyspace: keyspace,
  197. policy: policy,
  198. conns: make([]*Conn, 0, size),
  199. filling: false,
  200. closed: false,
  201. }
  202. // fill the pool with the initial connections before returning
  203. pool.fill()
  204. return pool
  205. }
  206. // Pick a connection from this connection pool for the given query.
  207. func (pool *hostConnPool) Pick(qry *Query) *Conn {
  208. pool.mu.RLock()
  209. if pool.closed {
  210. pool.mu.RUnlock()
  211. return nil
  212. }
  213. empty := len(pool.conns) == 0
  214. pool.mu.RUnlock()
  215. if empty {
  216. // try to fill the empty pool
  217. go pool.fill()
  218. return nil
  219. }
  220. return pool.policy.Pick(qry)
  221. }
  222. //Size returns the number of connections currently active in the pool
  223. func (pool *hostConnPool) Size() int {
  224. pool.mu.RLock()
  225. defer pool.mu.RUnlock()
  226. return len(pool.conns)
  227. }
  228. //Close the connection pool
  229. func (pool *hostConnPool) Close() {
  230. pool.mu.Lock()
  231. defer pool.mu.Unlock()
  232. if pool.closed {
  233. return
  234. }
  235. pool.closed = true
  236. // drain, but don't wait
  237. go pool.drain()
  238. }
  239. // Fill the connection pool
  240. func (pool *hostConnPool) fill() {
  241. pool.mu.RLock()
  242. // avoid filling a closed pool, or concurrent filling
  243. if pool.closed || pool.filling {
  244. pool.mu.RUnlock()
  245. return
  246. }
  247. // determine the filling work to be done
  248. startCount := len(pool.conns)
  249. fillCount := pool.size - startCount
  250. // avoid filling a full (or overfull) pool
  251. if fillCount <= 0 {
  252. pool.mu.RUnlock()
  253. return
  254. }
  255. // switch from read to write lock
  256. pool.mu.RUnlock()
  257. pool.mu.Lock()
  258. // double check everything since the lock was released
  259. startCount = len(pool.conns)
  260. fillCount = pool.size - startCount
  261. if pool.closed || pool.filling || fillCount <= 0 {
  262. // looks like another goroutine already beat this
  263. // goroutine to the filling
  264. pool.mu.Unlock()
  265. return
  266. }
  267. // ok fill the pool
  268. pool.filling = true
  269. // allow others to access the pool while filling
  270. pool.mu.Unlock()
  271. // only this goroutine should make calls to fill/empty the pool at this
  272. // point until after this routine or its subordinates calls
  273. // fillingStopped
  274. // fill only the first connection synchronously
  275. if startCount == 0 {
  276. err := pool.connect()
  277. pool.logConnectErr(err)
  278. if err != nil {
  279. // probably unreachable host
  280. go pool.fillingStopped()
  281. return
  282. }
  283. // filled one
  284. fillCount--
  285. // connect all connections to this host in sync
  286. for fillCount > 0 {
  287. err := pool.connect()
  288. pool.logConnectErr(err)
  289. // decrement, even on error
  290. fillCount--
  291. }
  292. go pool.fillingStopped()
  293. return
  294. }
  295. // fill the rest of the pool asynchronously
  296. go func() {
  297. for fillCount > 0 {
  298. err := pool.connect()
  299. pool.logConnectErr(err)
  300. // decrement, even on error
  301. fillCount--
  302. }
  303. // mark the end of filling
  304. pool.fillingStopped()
  305. }()
  306. }
  307. func (pool *hostConnPool) logConnectErr(err error) {
  308. if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
  309. // connection refused
  310. // these are typical during a node outage so avoid log spam.
  311. } else if err != nil {
  312. // unexpected error
  313. log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)
  314. }
  315. }
  316. // transition back to a not-filling state.
  317. func (pool *hostConnPool) fillingStopped() {
  318. // wait for some time to avoid back-to-back filling
  319. // this provides some time between failed attempts
  320. // to fill the pool for the host to recover
  321. time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
  322. pool.mu.Lock()
  323. pool.filling = false
  324. pool.mu.Unlock()
  325. }
  326. // create a new connection to the host and add it to the pool
  327. func (pool *hostConnPool) connect() error {
  328. // try to connect
  329. conn, err := Connect(pool.addr, pool.connCfg, pool)
  330. if err != nil {
  331. return err
  332. }
  333. if pool.keyspace != "" {
  334. // set the keyspace
  335. if err := conn.UseKeyspace(pool.keyspace); err != nil {
  336. conn.Close()
  337. return err
  338. }
  339. }
  340. // add the Conn to the pool
  341. pool.mu.Lock()
  342. defer pool.mu.Unlock()
  343. if pool.closed {
  344. conn.Close()
  345. return nil
  346. }
  347. pool.conns = append(pool.conns, conn)
  348. pool.policy.SetConns(pool.conns)
  349. return nil
  350. }
  351. // handle any error from a Conn
  352. func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
  353. if !closed {
  354. // still an open connection, so continue using it
  355. return
  356. }
  357. pool.mu.Lock()
  358. defer pool.mu.Unlock()
  359. if pool.closed {
  360. // pool closed
  361. return
  362. }
  363. // find the connection index
  364. for i, candidate := range pool.conns {
  365. if candidate == conn {
  366. // remove the connection, not preserving order
  367. pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]
  368. // update the policy
  369. pool.policy.SetConns(pool.conns)
  370. // lost a connection, so fill the pool
  371. go pool.fill()
  372. break
  373. }
  374. }
  375. }
  376. // removes and closes all connections from the pool
  377. func (pool *hostConnPool) drain() {
  378. pool.mu.Lock()
  379. defer pool.mu.Unlock()
  380. // empty the pool
  381. conns := pool.conns
  382. pool.conns = pool.conns[:0]
  383. // update the policy
  384. pool.policy.SetConns(pool.conns)
  385. // close the connections
  386. for _, conn := range conns {
  387. conn.Close()
  388. }
  389. }