|
@@ -7,6 +7,7 @@
|
|
|
package http2
|
|
package http2
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "crypto/tls"
|
|
|
"net/http"
|
|
"net/http"
|
|
|
"sync"
|
|
"sync"
|
|
|
)
|
|
)
|
|
@@ -17,21 +18,29 @@ type ClientConnPool interface {
|
|
|
MarkDead(*ClientConn)
|
|
MarkDead(*ClientConn)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// TODO: use singleflight for dialing and addConnCalls?
|
|
|
type clientConnPool struct {
|
|
type clientConnPool struct {
|
|
|
- t *Transport
|
|
|
|
|
|
|
+ t *Transport
|
|
|
|
|
+
|
|
|
mu sync.Mutex // TODO: maybe switch to RWMutex
|
|
mu sync.Mutex // TODO: maybe switch to RWMutex
|
|
|
// TODO: add support for sharing conns based on cert names
|
|
// TODO: add support for sharing conns based on cert names
|
|
|
// (e.g. share conn for googleapis.com and appspot.com)
|
|
// (e.g. share conn for googleapis.com and appspot.com)
|
|
|
- conns map[string][]*ClientConn // key is host:port
|
|
|
|
|
- dialing map[string]*dialCall // currently in-flight dials
|
|
|
|
|
- keys map[*ClientConn][]string
|
|
|
|
|
|
|
+ conns map[string][]*ClientConn // key is host:port
|
|
|
|
|
+ dialing map[string]*dialCall // currently in-flight dials
|
|
|
|
|
+ keys map[*ClientConn][]string
|
|
|
|
|
+ addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
|
- return p.getClientConn(req, addr, true)
|
|
|
|
|
|
|
+ return p.getClientConn(req, addr, dialOnMiss)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
|
|
|
|
|
|
|
+const (
|
|
|
|
|
+ dialOnMiss = true
|
|
|
|
|
+ noDialOnMiss = false
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+func (p *clientConnPool) getClientConn(_ *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
|
|
|
p.mu.Lock()
|
|
p.mu.Lock()
|
|
|
for _, cc := range p.conns[addr] {
|
|
for _, cc := range p.conns[addr] {
|
|
|
if cc.CanTakeNewRequest() {
|
|
if cc.CanTakeNewRequest() {
|
|
@@ -85,6 +94,64 @@ func (c *dialCall) dial(addr string) {
|
|
|
c.p.mu.Unlock()
|
|
c.p.mu.Unlock()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
|
|
|
|
|
+// already exist. It coalesces concurrent calls with the same key.
|
|
|
|
|
+// This is used by the http1 Transport code when it creates a new connection. Because
|
|
|
|
|
+// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
|
|
|
|
|
+// the protocol), it can get into a situation where it has multiple TLS connections.
|
|
|
|
|
+// This code decides which ones live or die.
|
|
|
|
|
+// The return value used is whether c was used.
|
|
|
|
|
+// c is never closed.
|
|
|
|
|
+func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
|
|
|
|
|
+ p.mu.Lock()
|
|
|
|
|
+ for _, cc := range p.conns[key] {
|
|
|
|
|
+ if cc.CanTakeNewRequest() {
|
|
|
|
|
+ p.mu.Unlock()
|
|
|
|
|
+ return false, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ call, dup := p.addConnCalls[key]
|
|
|
|
|
+ if !dup {
|
|
|
|
|
+ if p.addConnCalls == nil {
|
|
|
|
|
+ p.addConnCalls = make(map[string]*addConnCall)
|
|
|
|
|
+ }
|
|
|
|
|
+ call = &addConnCall{
|
|
|
|
|
+ p: p,
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
|
|
+ }
|
|
|
|
|
+ p.addConnCalls[key] = call
|
|
|
|
|
+ go call.run(t, key, c)
|
|
|
|
|
+ }
|
|
|
|
|
+ p.mu.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ <-call.done
|
|
|
|
|
+ if call.err != nil {
|
|
|
|
|
+ return false, call.err
|
|
|
|
|
+ }
|
|
|
|
|
+ return !dup, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type addConnCall struct {
|
|
|
|
|
+ p *clientConnPool
|
|
|
|
|
+ done chan struct{} // closed when done
|
|
|
|
|
+ err error
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
|
|
|
|
|
+ cc, err := t.NewClientConn(tc)
|
|
|
|
|
+
|
|
|
|
|
+ p := c.p
|
|
|
|
|
+ p.mu.Lock()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ c.err = err
|
|
|
|
|
+ } else {
|
|
|
|
|
+ p.addConnLocked(key, cc)
|
|
|
|
|
+ }
|
|
|
|
|
+ delete(p.addConnCalls, key)
|
|
|
|
|
+ p.mu.Unlock()
|
|
|
|
|
+ close(c.done)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (p *clientConnPool) addConn(key string, cc *ClientConn) {
|
|
func (p *clientConnPool) addConn(key string, cc *ClientConn) {
|
|
|
p.mu.Lock()
|
|
p.mu.Lock()
|
|
|
p.addConnLocked(key, cc)
|
|
p.addConnLocked(key, cc)
|