|
|
@@ -148,19 +148,20 @@ type ClientConn struct {
|
|
|
readerDone chan struct{} // closed on error
|
|
|
readerErr error // set before readerDone is closed
|
|
|
|
|
|
- mu sync.Mutex // guards following
|
|
|
- cond *sync.Cond // hold mu; broadcast on flow/closed changes
|
|
|
- flow flow // our conn-level flow control quota (cs.flow is per stream)
|
|
|
- inflow flow // peer's conn-level flow control
|
|
|
- closed bool
|
|
|
- goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
|
|
|
- goAwayDebug string // goAway frame's debug data, retained as a string
|
|
|
- streams map[uint32]*clientStream // client-initiated
|
|
|
- nextStreamID uint32
|
|
|
- bw *bufio.Writer
|
|
|
- br *bufio.Reader
|
|
|
- fr *Framer
|
|
|
- lastActive time.Time
|
|
|
+ mu sync.Mutex // guards following
|
|
|
+ cond *sync.Cond // hold mu; broadcast on flow/closed changes
|
|
|
+ flow flow // our conn-level flow control quota (cs.flow is per stream)
|
|
|
+ inflow flow // peer's conn-level flow control
|
|
|
+ closed bool
|
|
|
+ wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
|
|
|
+ goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
|
|
|
+ goAwayDebug string // goAway frame's debug data, retained as a string
|
|
|
+ streams map[uint32]*clientStream // client-initiated
|
|
|
+ nextStreamID uint32
|
|
|
+ bw *bufio.Writer
|
|
|
+ br *bufio.Reader
|
|
|
+ fr *Framer
|
|
|
+ lastActive time.Time
|
|
|
|
|
|
// Settings from peer:
|
|
|
maxFrameSize uint32
|
|
|
@@ -416,10 +417,6 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
if VerboseLogs {
|
|
|
t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr())
|
|
|
}
|
|
|
- if _, err := c.Write(clientPreface); err != nil {
|
|
|
- t.vlogf("client preface write error: %v", err)
|
|
|
- return nil, err
|
|
|
- }
|
|
|
|
|
|
cc := &ClientConn{
|
|
|
t: t,
|
|
|
@@ -431,6 +428,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
|
|
|
streams: make(map[uint32]*clientStream),
|
|
|
singleUse: singleUse,
|
|
|
+ wantSettingsAck: true,
|
|
|
}
|
|
|
cc.cond = sync.NewCond(&cc.mu)
|
|
|
cc.flow.add(int32(initialWindowSize))
|
|
|
@@ -459,6 +457,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
if max := t.maxHeaderListSize(); max != 0 {
|
|
|
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
|
|
|
}
|
|
|
+
|
|
|
+ cc.bw.Write(clientPreface)
|
|
|
cc.fr.WriteSettings(initialSettings...)
|
|
|
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
|
|
|
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
|
|
|
@@ -467,33 +467,6 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
return nil, cc.werr
|
|
|
}
|
|
|
|
|
|
- // Read the obligatory SETTINGS frame
|
|
|
- f, err := cc.fr.ReadFrame()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- sf, ok := f.(*SettingsFrame)
|
|
|
- if !ok {
|
|
|
- return nil, fmt.Errorf("expected settings frame, got: %T", f)
|
|
|
- }
|
|
|
- cc.fr.WriteSettingsAck()
|
|
|
- cc.bw.Flush()
|
|
|
-
|
|
|
- sf.ForeachSetting(func(s Setting) error {
|
|
|
- switch s.ID {
|
|
|
- case SettingMaxFrameSize:
|
|
|
- cc.maxFrameSize = s.Val
|
|
|
- case SettingMaxConcurrentStreams:
|
|
|
- cc.maxConcurrentStreams = s.Val
|
|
|
- case SettingInitialWindowSize:
|
|
|
- cc.initialWindowSize = s.Val
|
|
|
- default:
|
|
|
- // TODO(bradfitz): handle more; at least SETTINGS_HEADER_TABLE_SIZE?
|
|
|
- t.vlogf("Unhandled Setting: %v", s)
|
|
|
- }
|
|
|
- return nil
|
|
|
- })
|
|
|
-
|
|
|
go cc.readLoop()
|
|
|
return cc, nil
|
|
|
}
|
|
|
@@ -936,28 +909,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ var trls []byte
|
|
|
+ if !sentEnd && hasTrailers {
|
|
|
+ cc.mu.Lock()
|
|
|
+ defer cc.mu.Unlock()
|
|
|
+ trls = cc.encodeTrailers(req)
|
|
|
+ }
|
|
|
+
|
|
|
cc.wmu.Lock()
|
|
|
- if !sentEnd {
|
|
|
- var trls []byte
|
|
|
- if hasTrailers {
|
|
|
- cc.mu.Lock()
|
|
|
- trls = cc.encodeTrailers(req)
|
|
|
- cc.mu.Unlock()
|
|
|
- }
|
|
|
+ defer cc.wmu.Unlock()
|
|
|
|
|
|
- // Avoid forgetting to send an END_STREAM if the encoded
|
|
|
- // trailers are 0 bytes. Both results produce and END_STREAM.
|
|
|
- if len(trls) > 0 {
|
|
|
- err = cc.writeHeaders(cs.ID, true, trls)
|
|
|
- } else {
|
|
|
- err = cc.fr.WriteData(cs.ID, true, nil)
|
|
|
- }
|
|
|
+ // Avoid forgetting to send an END_STREAM if the encoded
|
|
|
+ // trailers are 0 bytes. Both results produce and END_STREAM.
|
|
|
+ if len(trls) > 0 {
|
|
|
+ err = cc.writeHeaders(cs.ID, true, trls)
|
|
|
+ } else {
|
|
|
+ err = cc.fr.WriteData(cs.ID, true, nil)
|
|
|
}
|
|
|
if ferr := cc.bw.Flush(); ferr != nil && err == nil {
|
|
|
err = ferr
|
|
|
}
|
|
|
- cc.wmu.Unlock()
|
|
|
-
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -1203,6 +1174,14 @@ func (e GoAwayError) Error() string {
|
|
|
e.LastStreamID, e.ErrCode, e.DebugData)
|
|
|
}
|
|
|
|
|
|
+func isEOFOrNetReadError(err error) bool {
|
|
|
+ if err == io.EOF {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ ne, ok := err.(*net.OpError)
|
|
|
+ return ok && ne.Op == "read"
|
|
|
+}
|
|
|
+
|
|
|
func (rl *clientConnReadLoop) cleanup() {
|
|
|
cc := rl.cc
|
|
|
defer cc.tconn.Close()
|
|
|
@@ -1214,16 +1193,14 @@ func (rl *clientConnReadLoop) cleanup() {
|
|
|
// gotten a response yet.
|
|
|
err := cc.readerErr
|
|
|
cc.mu.Lock()
|
|
|
- if err == io.EOF {
|
|
|
- if cc.goAway != nil {
|
|
|
- err = GoAwayError{
|
|
|
- LastStreamID: cc.goAway.LastStreamID,
|
|
|
- ErrCode: cc.goAway.ErrCode,
|
|
|
- DebugData: cc.goAwayDebug,
|
|
|
- }
|
|
|
- } else {
|
|
|
- err = io.ErrUnexpectedEOF
|
|
|
+ if cc.goAway != nil && isEOFOrNetReadError(err) {
|
|
|
+ err = GoAwayError{
|
|
|
+ LastStreamID: cc.goAway.LastStreamID,
|
|
|
+ ErrCode: cc.goAway.ErrCode,
|
|
|
+ DebugData: cc.goAwayDebug,
|
|
|
}
|
|
|
+ } else if err == io.EOF {
|
|
|
+ err = io.ErrUnexpectedEOF
|
|
|
}
|
|
|
for _, cs := range rl.activeRes {
|
|
|
cs.bufPipe.CloseWithError(err)
|
|
|
@@ -1243,7 +1220,8 @@ func (rl *clientConnReadLoop) cleanup() {
|
|
|
func (rl *clientConnReadLoop) run() error {
|
|
|
cc := rl.cc
|
|
|
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
|
|
|
- gotReply := false // ever saw a reply
|
|
|
+ gotReply := false // ever saw a HEADERS reply
|
|
|
+ gotSettings := false
|
|
|
for {
|
|
|
f, err := cc.fr.ReadFrame()
|
|
|
if err != nil {
|
|
|
@@ -1260,6 +1238,13 @@ func (rl *clientConnReadLoop) run() error {
|
|
|
if VerboseLogs {
|
|
|
cc.vlogf("http2: Transport received %s", summarizeFrame(f))
|
|
|
}
|
|
|
+ if !gotSettings {
|
|
|
+ if _, ok := f.(*SettingsFrame); !ok {
|
|
|
+ cc.logf("protocol error: received %T before a SETTINGS frame", f)
|
|
|
+ return ConnectionError(ErrCodeProtocol)
|
|
|
+ }
|
|
|
+ gotSettings = true
|
|
|
+ }
|
|
|
maybeIdle := false // whether frame might transition us to idle
|
|
|
|
|
|
switch f := f.(type) {
|
|
|
@@ -1681,7 +1666,16 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
|
|
|
cc := rl.cc
|
|
|
cc.mu.Lock()
|
|
|
defer cc.mu.Unlock()
|
|
|
- return f.ForeachSetting(func(s Setting) error {
|
|
|
+
|
|
|
+ if f.IsAck() {
|
|
|
+ if cc.wantSettingsAck {
|
|
|
+ cc.wantSettingsAck = false
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return ConnectionError(ErrCodeProtocol)
|
|
|
+ }
|
|
|
+
|
|
|
+ err := f.ForeachSetting(func(s Setting) error {
|
|
|
switch s.ID {
|
|
|
case SettingMaxFrameSize:
|
|
|
cc.maxFrameSize = s.Val
|
|
|
@@ -1700,6 +1694,16 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
|
|
|
}
|
|
|
return nil
|
|
|
})
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ cc.wmu.Lock()
|
|
|
+ defer cc.wmu.Unlock()
|
|
|
+
|
|
|
+ cc.fr.WriteSettingsAck()
|
|
|
+ cc.bw.Flush()
|
|
|
+ return cc.werr
|
|
|
}
|
|
|
|
|
|
func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
|