client_conn_pool.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code's client connection pooling.
  5. package http2
  6. import (
  7. "crypto/tls"
  8. "net/http"
  9. "sync"
  10. )
  11. // ClientConnPool manages a pool of HTTP/2 client connections.
  12. type ClientConnPool interface {
  13. GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  14. MarkDead(*ClientConn)
  15. }
  16. // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
  17. // implementations which can close their idle connections.
  18. type clientConnPoolIdleCloser interface {
  19. ClientConnPool
  20. closeIdleConnections()
  21. }
  22. var (
  23. _ clientConnPoolIdleCloser = (*clientConnPool)(nil)
  24. _ clientConnPoolIdleCloser = noDialClientConnPool{}
  25. )
  26. // TODO: use singleflight for dialing and addConnCalls?
  27. type clientConnPool struct {
  28. t *Transport
  29. mu sync.Mutex // TODO: maybe switch to RWMutex
  30. // TODO: add support for sharing conns based on cert names
  31. // (e.g. share conn for googleapis.com and appspot.com)
  32. conns map[string][]*ClientConn // key is host:port
  33. dialing map[string]*dialCall // currently in-flight dials
  34. keys map[*ClientConn][]string
  35. addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls
  36. }
  37. func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  38. return p.getClientConn(req, addr, dialOnMiss)
  39. }
  40. const (
  41. dialOnMiss = true
  42. noDialOnMiss = false
  43. )
  44. func (p *clientConnPool) getClientConn(_ *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  45. p.mu.Lock()
  46. for _, cc := range p.conns[addr] {
  47. if cc.CanTakeNewRequest() {
  48. p.mu.Unlock()
  49. return cc, nil
  50. }
  51. }
  52. if !dialOnMiss {
  53. p.mu.Unlock()
  54. return nil, ErrNoCachedConn
  55. }
  56. call := p.getStartDialLocked(addr)
  57. p.mu.Unlock()
  58. <-call.done
  59. return call.res, call.err
  60. }
  61. // dialCall is an in-flight Transport dial call to a host.
  62. type dialCall struct {
  63. p *clientConnPool
  64. done chan struct{} // closed when done
  65. res *ClientConn // valid after done is closed
  66. err error // valid after done is closed
  67. }
  68. // requires p.mu is held.
  69. func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
  70. if call, ok := p.dialing[addr]; ok {
  71. // A dial is already in-flight. Don't start another.
  72. return call
  73. }
  74. call := &dialCall{p: p, done: make(chan struct{})}
  75. if p.dialing == nil {
  76. p.dialing = make(map[string]*dialCall)
  77. }
  78. p.dialing[addr] = call
  79. go call.dial(addr)
  80. return call
  81. }
  82. // run in its own goroutine.
  83. func (c *dialCall) dial(addr string) {
  84. c.res, c.err = c.p.t.dialClientConn(addr)
  85. close(c.done)
  86. c.p.mu.Lock()
  87. delete(c.p.dialing, addr)
  88. if c.err == nil {
  89. c.p.addConnLocked(addr, c.res)
  90. }
  91. c.p.mu.Unlock()
  92. }
  93. // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
  94. // already exist. It coalesces concurrent calls with the same key.
  95. // This is used by the http1 Transport code when it creates a new connection. Because
  96. // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
  97. // the protocol), it can get into a situation where it has multiple TLS connections.
  98. // This code decides which ones live or die.
  99. // The return value used is whether c was used.
  100. // c is never closed.
  101. func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
  102. p.mu.Lock()
  103. for _, cc := range p.conns[key] {
  104. if cc.CanTakeNewRequest() {
  105. p.mu.Unlock()
  106. return false, nil
  107. }
  108. }
  109. call, dup := p.addConnCalls[key]
  110. if !dup {
  111. if p.addConnCalls == nil {
  112. p.addConnCalls = make(map[string]*addConnCall)
  113. }
  114. call = &addConnCall{
  115. p: p,
  116. done: make(chan struct{}),
  117. }
  118. p.addConnCalls[key] = call
  119. go call.run(t, key, c)
  120. }
  121. p.mu.Unlock()
  122. <-call.done
  123. if call.err != nil {
  124. return false, call.err
  125. }
  126. return !dup, nil
  127. }
  128. type addConnCall struct {
  129. p *clientConnPool
  130. done chan struct{} // closed when done
  131. err error
  132. }
  133. func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
  134. cc, err := t.NewClientConn(tc)
  135. p := c.p
  136. p.mu.Lock()
  137. if err != nil {
  138. c.err = err
  139. } else {
  140. p.addConnLocked(key, cc)
  141. }
  142. delete(p.addConnCalls, key)
  143. p.mu.Unlock()
  144. close(c.done)
  145. }
  146. func (p *clientConnPool) addConn(key string, cc *ClientConn) {
  147. p.mu.Lock()
  148. p.addConnLocked(key, cc)
  149. p.mu.Unlock()
  150. }
  151. // p.mu must be held
  152. func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
  153. for _, v := range p.conns[key] {
  154. if v == cc {
  155. return
  156. }
  157. }
  158. if p.conns == nil {
  159. p.conns = make(map[string][]*ClientConn)
  160. }
  161. if p.keys == nil {
  162. p.keys = make(map[*ClientConn][]string)
  163. }
  164. p.conns[key] = append(p.conns[key], cc)
  165. p.keys[cc] = append(p.keys[cc], key)
  166. }
  167. func (p *clientConnPool) MarkDead(cc *ClientConn) {
  168. p.mu.Lock()
  169. defer p.mu.Unlock()
  170. for _, key := range p.keys[cc] {
  171. vv, ok := p.conns[key]
  172. if !ok {
  173. continue
  174. }
  175. newList := filterOutClientConn(vv, cc)
  176. if len(newList) > 0 {
  177. p.conns[key] = newList
  178. } else {
  179. delete(p.conns, key)
  180. }
  181. }
  182. delete(p.keys, cc)
  183. }
  184. func (p *clientConnPool) closeIdleConnections() {
  185. p.mu.Lock()
  186. defer p.mu.Unlock()
  187. // TODO: don't close a cc if it was just added to the pool
  188. // milliseconds ago and has never been used. There's currently
  189. // a small race window with the HTTP/1 Transport's integration
  190. // where it can add an idle conn just before using it, and
  191. // somebody else can concurrently call CloseIdleConns and
  192. // break some caller's RoundTrip.
  193. for _, vv := range p.conns {
  194. for _, cc := range vv {
  195. cc.closeIfIdle()
  196. }
  197. }
  198. }
  199. func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
  200. out := in[:0]
  201. for _, v := range in {
  202. if v != exclude {
  203. out = append(out, v)
  204. }
  205. }
  206. // If we filtered it out, zero out the last item to prevent
  207. // the GC from seeing it.
  208. if len(in) != len(out) {
  209. in[len(in)-1] = nil
  210. }
  211. return out
  212. }
  213. // noDialClientConnPool is an implementation of http2.ClientConnPool
  214. // which never dials. We let the HTTP/1.1 client dial and use its TLS
  215. // connection instead.
  216. type noDialClientConnPool struct{ *clientConnPool }
  217. func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  218. return p.getClientConn(req, addr, noDialOnMiss)
  219. }