client_conn_pool.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code's client connection pooling.
  5. package http2
  6. import (
  7. "crypto/tls"
  8. "net/http"
  9. "sync"
  10. )
  11. // ClientConnPool manages a pool of HTTP/2 client connections.
  12. type ClientConnPool interface {
  13. GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  14. MarkDead(*ClientConn)
  15. }
  16. // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
  17. // implementations which can close their idle connections.
  18. type clientConnPoolIdleCloser interface {
  19. ClientConnPool
  20. closeIdleConnections()
  21. }
  22. var (
  23. _ clientConnPoolIdleCloser = (*clientConnPool)(nil)
  24. _ clientConnPoolIdleCloser = noDialClientConnPool{}
  25. )
  26. // TODO: use singleflight for dialing and addConnCalls?
  27. type clientConnPool struct {
  28. t *Transport
  29. mu sync.Mutex // TODO: maybe switch to RWMutex
  30. // TODO: add support for sharing conns based on cert names
  31. // (e.g. share conn for googleapis.com and appspot.com)
  32. conns map[string][]*ClientConn // key is host:port
  33. dialing map[string]*dialCall // currently in-flight dials
  34. keys map[*ClientConn][]string
  35. addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls
  36. }
  37. func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  38. return p.getClientConn(req, addr, dialOnMiss)
  39. }
  40. const (
  41. dialOnMiss = true
  42. noDialOnMiss = false
  43. )
  44. // shouldTraceGetConn reports whether getClientConn should call any
  45. // ClientTrace.GetConn hook associated with the http.Request.
  46. //
  47. // This complexity is needed to avoid double calls of the GetConn hook
  48. // during the back-and-forth between net/http and x/net/http2 (when the
  49. // net/http.Transport is upgraded to also speak http2), as well as support
  50. // the case where x/net/http2 is being used directly.
  51. func (p *clientConnPool) shouldTraceGetConn(st clientConnIdleState) bool {
  52. // If our Transport wasn't made via ConfigureTransport, always
  53. // trace the GetConn hook if provided, because that means the
  54. // http2 package is being used directly and it's the one
  55. // dialing, as opposed to net/http.
  56. if _, ok := p.t.ConnPool.(noDialClientConnPool); !ok {
  57. return true
  58. }
  59. // Otherwise, only use the GetConn hook if this connection has
  60. // been used previously for other requests. For fresh
  61. // connections, the net/http package does the dialing.
  62. return !st.freshConn
  63. }
  64. func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  65. if isConnectionCloseRequest(req) && dialOnMiss {
  66. // It gets its own connection.
  67. traceGetConn(req, addr)
  68. const singleUse = true
  69. cc, err := p.t.dialClientConn(addr, singleUse)
  70. if err != nil {
  71. return nil, err
  72. }
  73. return cc, nil
  74. }
  75. p.mu.Lock()
  76. for _, cc := range p.conns[addr] {
  77. if st := cc.idleState(); st.canTakeNewRequest {
  78. if p.shouldTraceGetConn(st) {
  79. traceGetConn(req, addr)
  80. }
  81. p.mu.Unlock()
  82. return cc, nil
  83. }
  84. }
  85. if !dialOnMiss {
  86. p.mu.Unlock()
  87. return nil, ErrNoCachedConn
  88. }
  89. traceGetConn(req, addr)
  90. call := p.getStartDialLocked(addr)
  91. p.mu.Unlock()
  92. <-call.done
  93. return call.res, call.err
  94. }
  95. // dialCall is an in-flight Transport dial call to a host.
  96. type dialCall struct {
  97. p *clientConnPool
  98. done chan struct{} // closed when done
  99. res *ClientConn // valid after done is closed
  100. err error // valid after done is closed
  101. }
  102. // requires p.mu is held.
  103. func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
  104. if call, ok := p.dialing[addr]; ok {
  105. // A dial is already in-flight. Don't start another.
  106. return call
  107. }
  108. call := &dialCall{p: p, done: make(chan struct{})}
  109. if p.dialing == nil {
  110. p.dialing = make(map[string]*dialCall)
  111. }
  112. p.dialing[addr] = call
  113. go call.dial(addr)
  114. return call
  115. }
  116. // run in its own goroutine.
  117. func (c *dialCall) dial(addr string) {
  118. const singleUse = false // shared conn
  119. c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
  120. close(c.done)
  121. c.p.mu.Lock()
  122. delete(c.p.dialing, addr)
  123. if c.err == nil {
  124. c.p.addConnLocked(addr, c.res)
  125. }
  126. c.p.mu.Unlock()
  127. }
  128. // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
  129. // already exist. It coalesces concurrent calls with the same key.
  130. // This is used by the http1 Transport code when it creates a new connection. Because
  131. // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
  132. // the protocol), it can get into a situation where it has multiple TLS connections.
  133. // This code decides which ones live or die.
  134. // The return value used is whether c was used.
  135. // c is never closed.
  136. func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
  137. p.mu.Lock()
  138. for _, cc := range p.conns[key] {
  139. if cc.CanTakeNewRequest() {
  140. p.mu.Unlock()
  141. return false, nil
  142. }
  143. }
  144. call, dup := p.addConnCalls[key]
  145. if !dup {
  146. if p.addConnCalls == nil {
  147. p.addConnCalls = make(map[string]*addConnCall)
  148. }
  149. call = &addConnCall{
  150. p: p,
  151. done: make(chan struct{}),
  152. }
  153. p.addConnCalls[key] = call
  154. go call.run(t, key, c)
  155. }
  156. p.mu.Unlock()
  157. <-call.done
  158. if call.err != nil {
  159. return false, call.err
  160. }
  161. return !dup, nil
  162. }
  163. type addConnCall struct {
  164. p *clientConnPool
  165. done chan struct{} // closed when done
  166. err error
  167. }
  168. func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
  169. cc, err := t.NewClientConn(tc)
  170. p := c.p
  171. p.mu.Lock()
  172. if err != nil {
  173. c.err = err
  174. } else {
  175. p.addConnLocked(key, cc)
  176. }
  177. delete(p.addConnCalls, key)
  178. p.mu.Unlock()
  179. close(c.done)
  180. }
  181. func (p *clientConnPool) addConn(key string, cc *ClientConn) {
  182. p.mu.Lock()
  183. p.addConnLocked(key, cc)
  184. p.mu.Unlock()
  185. }
  186. // p.mu must be held
  187. func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
  188. for _, v := range p.conns[key] {
  189. if v == cc {
  190. return
  191. }
  192. }
  193. if p.conns == nil {
  194. p.conns = make(map[string][]*ClientConn)
  195. }
  196. if p.keys == nil {
  197. p.keys = make(map[*ClientConn][]string)
  198. }
  199. p.conns[key] = append(p.conns[key], cc)
  200. p.keys[cc] = append(p.keys[cc], key)
  201. }
  202. func (p *clientConnPool) MarkDead(cc *ClientConn) {
  203. p.mu.Lock()
  204. defer p.mu.Unlock()
  205. for _, key := range p.keys[cc] {
  206. vv, ok := p.conns[key]
  207. if !ok {
  208. continue
  209. }
  210. newList := filterOutClientConn(vv, cc)
  211. if len(newList) > 0 {
  212. p.conns[key] = newList
  213. } else {
  214. delete(p.conns, key)
  215. }
  216. }
  217. delete(p.keys, cc)
  218. }
  219. func (p *clientConnPool) closeIdleConnections() {
  220. p.mu.Lock()
  221. defer p.mu.Unlock()
  222. // TODO: don't close a cc if it was just added to the pool
  223. // milliseconds ago and has never been used. There's currently
  224. // a small race window with the HTTP/1 Transport's integration
  225. // where it can add an idle conn just before using it, and
  226. // somebody else can concurrently call CloseIdleConns and
  227. // break some caller's RoundTrip.
  228. for _, vv := range p.conns {
  229. for _, cc := range vv {
  230. cc.closeIfIdle()
  231. }
  232. }
  233. }
  234. func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
  235. out := in[:0]
  236. for _, v := range in {
  237. if v != exclude {
  238. out = append(out, v)
  239. }
  240. }
  241. // If we filtered it out, zero out the last item to prevent
  242. // the GC from seeing it.
  243. if len(in) != len(out) {
  244. in[len(in)-1] = nil
  245. }
  246. return out
  247. }
  248. // noDialClientConnPool is an implementation of http2.ClientConnPool
  249. // which never dials. We let the HTTP/1.1 client dial and use its TLS
  250. // connection instead.
  251. type noDialClientConnPool struct{ *clientConnPool }
  252. func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  253. return p.getClientConn(req, addr, noDialOnMiss)
  254. }