client_conn_pool.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code's client connection pooling.
  5. package http2
  6. import (
  7. "net/http"
  8. "sync"
  9. )
  10. // ClientConnPool manages a pool of HTTP/2 client connections.
  11. type ClientConnPool interface {
  12. GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  13. MarkDead(*ClientConn)
  14. }
  15. type clientConnPool struct {
  16. t *Transport
  17. mu sync.Mutex // TODO: maybe switch to RWMutex
  18. // TODO: add support for sharing conns based on cert names
  19. // (e.g. share conn for googleapis.com and appspot.com)
  20. conns map[string][]*ClientConn // key is host:port
  21. dialing map[string]*dialCall // currently in-flight dials
  22. keys map[*ClientConn][]string
  23. }
  24. func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  25. return p.getClientConn(req, addr, true)
  26. }
  27. func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  28. p.mu.Lock()
  29. for _, cc := range p.conns[addr] {
  30. if cc.CanTakeNewRequest() {
  31. p.mu.Unlock()
  32. return cc, nil
  33. }
  34. }
  35. if !dialOnMiss {
  36. p.mu.Unlock()
  37. return nil, ErrNoCachedConn
  38. }
  39. call := p.getStartDialLocked(addr)
  40. p.mu.Unlock()
  41. <-call.done
  42. return call.res, call.err
  43. }
  44. // dialCall is an in-flight Transport dial call to a host.
  45. type dialCall struct {
  46. p *clientConnPool
  47. done chan struct{} // closed when done
  48. res *ClientConn // valid after done is closed
  49. err error // valid after done is closed
  50. }
  51. // requires p.mu is held.
  52. func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
  53. if call, ok := p.dialing[addr]; ok {
  54. // A dial is already in-flight. Don't start another.
  55. return call
  56. }
  57. call := &dialCall{p: p, done: make(chan struct{})}
  58. if p.dialing == nil {
  59. p.dialing = make(map[string]*dialCall)
  60. }
  61. p.dialing[addr] = call
  62. go call.dial(addr)
  63. return call
  64. }
  65. // run in its own goroutine.
  66. func (c *dialCall) dial(addr string) {
  67. c.res, c.err = c.p.t.dialClientConn(addr)
  68. close(c.done)
  69. c.p.mu.Lock()
  70. delete(c.p.dialing, addr)
  71. if c.err == nil {
  72. c.p.addConnLocked(addr, c.res)
  73. }
  74. c.p.mu.Unlock()
  75. }
  76. func (p *clientConnPool) addConn(key string, cc *ClientConn) {
  77. p.mu.Lock()
  78. p.addConnLocked(key, cc)
  79. p.mu.Unlock()
  80. }
  81. // p.mu must be held
  82. func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
  83. for _, v := range p.conns[key] {
  84. if v == cc {
  85. return
  86. }
  87. }
  88. if p.conns == nil {
  89. p.conns = make(map[string][]*ClientConn)
  90. }
  91. if p.keys == nil {
  92. p.keys = make(map[*ClientConn][]string)
  93. }
  94. p.conns[key] = append(p.conns[key], cc)
  95. p.keys[cc] = append(p.keys[cc], key)
  96. }
  97. func (p *clientConnPool) MarkDead(cc *ClientConn) {
  98. p.mu.Lock()
  99. defer p.mu.Unlock()
  100. for _, key := range p.keys[cc] {
  101. vv, ok := p.conns[key]
  102. if !ok {
  103. continue
  104. }
  105. newList := filterOutClientConn(vv, cc)
  106. if len(newList) > 0 {
  107. p.conns[key] = newList
  108. } else {
  109. delete(p.conns, key)
  110. }
  111. }
  112. delete(p.keys, cc)
  113. }
  114. func (p *clientConnPool) closeIdleConnections() {
  115. p.mu.Lock()
  116. defer p.mu.Unlock()
  117. // TODO: don't close a cc if it was just added to the pool
  118. // milliseconds ago and has never been used. There's currently
  119. // a small race window with the HTTP/1 Transport's integration
  120. // where it can add an idle conn just before using it, and
  121. // somebody else can concurrently call CloseIdleConns and
  122. // break some caller's RoundTrip.
  123. for _, vv := range p.conns {
  124. for _, cc := range vv {
  125. cc.closeIfIdle()
  126. }
  127. }
  128. }
  129. func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
  130. out := in[:0]
  131. for _, v := range in {
  132. if v != exclude {
  133. out = append(out, v)
  134. }
  135. }
  136. // If we filtered it out, zero out the last item to prevent
  137. // the GC from seeing it.
  138. if len(in) != len(out) {
  139. in[len(in)-1] = nil
  140. }
  141. return out
  142. }