Bladeren bron

http2: push stream look up later in Transport, address some TODOs/cleanups

Change-Id: Iaed80981b985e6e02ede5afc4a224490c4ceac01
Reviewed-on: https://go-review.googlesource.com/16380
Reviewed-by: Blake Mizerany <blake.mizerany@gmail.com>
Brad Fitzpatrick 10 jaren geleden
bovenliggende
commit
efe38d9b30
1 gewijzigde bestanden met toevoegingen van 48 en 54 verwijderingen
  1. 48 54
      http2/transport.go

+ 48 - 54
http2/transport.go

@@ -97,9 +97,6 @@ type clientStream struct {
 	resc    chan resAndError
 	bufPipe pipe // buffered pipe with the flow-controlled response payload
 
-	// Owned by readLoop goroutine:
-	ended bool // on STREAM_ENDED from any of HEADERS/CONTINUATION/DATA
-
 	flow   flow // guarded by cc.mu
 	inflow flow // guarded by cc.mu
 
@@ -674,11 +671,11 @@ func (cc *clientConn) readLoop() {
 		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
 		cc.wmu.Unlock()
 	}
-	cc.tconn.Close()
 }
 
 func (rl *clientConnReadLoop) cleanup() {
 	cc := rl.cc
+	defer cc.tconn.Close()
 	defer cc.t.removeClientConn(cc)
 	defer close(cc.readerDone)
 
@@ -689,11 +686,16 @@ func (rl *clientConnReadLoop) cleanup() {
 	if err == io.EOF {
 		err = io.ErrUnexpectedEOF
 	}
+	cc.mu.Lock()
 	for _, cs := range rl.activeRes {
 		cs.bufPipe.CloseWithError(err)
 	}
-
-	cc.mu.Lock()
+	for _, cs := range cc.streams {
+		select {
+		case cs.resc <- resAndError{err: err}:
+		default:
+		}
+	}
 	cc.closed = true
 	cc.cond.Broadcast()
 	cc.mu.Unlock()
@@ -726,47 +728,23 @@ func (rl *clientConnReadLoop) run() error {
 			return ConnectionError(ErrCodeProtocol)
 		}
 
-		if streamID%2 == 0 {
-			// Ignore streams pushed from the server for now.
-			// These always have an even stream id.
-			continue
-		}
-
-		// TODO(bradfitz): push all this (streamEnded + lookup
-		// of clientStream) into per-frame methods, so the
-		// cc.mu Lock/Unlock is only done once.
-		streamEnded := false
-		if ff, ok := f.(streamEnder); ok {
-			streamEnded = ff.StreamEnded()
-		}
-		var cs *clientStream
-		if streamID != 0 {
-			cs = cc.streamByID(streamID, streamEnded)
-			if cs == nil {
-				rl.cc.logf("Received frame for untracked stream ID %d", streamID)
-			}
-		}
-
 		switch f := f.(type) {
 		case *HeadersFrame:
-			err = rl.processHeaders(f, cs)
+			err = rl.processHeaders(f)
 		case *ContinuationFrame:
-			err = rl.processContinuation(f, cs)
+			err = rl.processContinuation(f)
 		case *DataFrame:
-			err = rl.processData(f, cs)
+			err = rl.processData(f)
 		case *GoAwayFrame:
 			err = rl.processGoAway(f)
 		case *RSTStreamFrame:
-			err = rl.processResetStream(f, cs)
+			err = rl.processResetStream(f)
 		case *SettingsFrame:
 			err = rl.processSettings(f)
 		case *PushPromiseFrame:
-			// Skip it. We told the peer we don't want
-			// them anyway. And we already skipped even
-			// stream IDs above. So really shouldn't be
-			// here.
+			err = rl.processPushPromise(f)
 		case *WindowUpdateFrame:
-			err = rl.processWindowUpdate(f, cs)
+			err = rl.processWindowUpdate(f)
 		default:
 			cc.logf("Transport: unhandled response frame type %T", f)
 		}
@@ -776,7 +754,7 @@ func (rl *clientConnReadLoop) run() error {
 	}
 }
 
-func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame, cs *clientStream) error {
+func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame) error {
 	rl.sawRegHeader = false
 	rl.reqMalformed = nil
 	rl.nextRes = &http.Response{
@@ -784,15 +762,24 @@ func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame, cs *clientStream)
 		ProtoMajor: 2,
 		Header:     make(http.Header),
 	}
-	return rl.processHeaderBlockFragment(cs, f.HeaderBlockFragment(), f.HeadersEnded(), f.StreamEnded())
+	return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
 }
 
-func (rl *clientConnReadLoop) processContinuation(f *ContinuationFrame, cs *clientStream) error {
-	return rl.processHeaderBlockFragment(cs, f.HeaderBlockFragment(), f.HeadersEnded(), f.StreamEnded())
+func (rl *clientConnReadLoop) processContinuation(f *ContinuationFrame) error {
+	return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
 }
 
-func (rl *clientConnReadLoop) processHeaderBlockFragment(cs *clientStream, frag []byte, headersEnded, streamEnded bool) error {
+func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID uint32, headersEnded, streamEnded bool) error {
+	cc := rl.cc
+	cs := cc.streamByID(streamID, streamEnded)
 	if cs == nil {
+		// We could return a ConnectionError(ErrCodeProtocol)
+		// here, except that in the case of us canceling
+		// client requests, we may also delete from the
+		// streams map, in which case we forgot that we sent
+		// this request. So, just ignore any responses for
+		// now.  They might've been in-flight before the
+		// server got our RST_STREAM.
 		return nil
 	}
 	_, err := rl.hdec.Write(frag)
@@ -816,7 +803,6 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(cs *clientStream, frag
 	res := rl.nextRes
 	if streamEnded {
 		res.Body = noBody
-		cs.ended = true
 	} else {
 		buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
 		cs.bufPipe = pipe{b: buf}
@@ -880,19 +866,16 @@ func (b transportResponseBody) Close() error {
 	return nil
 }
 
-func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error {
+func (rl *clientConnReadLoop) processData(f *DataFrame) error {
+	cc := rl.cc
+	cs := cc.streamByID(f.StreamID, f.StreamEnded())
 	if cs == nil {
 		return nil
 	}
-	if cs.ended {
-		// TODO: add test for this (DATA frame after STREAM_ENDED cases)
-		return ConnectionError(ErrCodeProtocol)
-	}
 	data := f.Data()
 	if VerboseLogs {
 		rl.cc.logf("DATA: %q", data)
 	}
-	cc := rl.cc
 
 	// Check connection-level flow control.
 	cc.mu.Lock()
@@ -907,10 +890,8 @@ func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error
 	if _, err := cs.bufPipe.Write(data); err != nil {
 		return err
 	}
-	// send WINDOW_UPDATE frames occasionally as per-stream flow control depletes
 
 	if f.StreamEnded() {
-		cs.ended = true
 		cs.bufPipe.CloseWithError(io.EOF)
 		delete(rl.activeRes, cs.ID)
 	}
@@ -953,11 +934,12 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
 	})
 }
 
-func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame, cs *clientStream) error {
+func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
+	cc := rl.cc
+	cs := cc.streamByID(f.StreamID, false)
 	if f.StreamID != 0 && cs == nil {
 		return nil
 	}
-	cc := rl.cc
 
 	cc.mu.Lock()
 	defer cc.mu.Unlock()
@@ -973,9 +955,10 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame, cs *clie
 	return nil
 }
 
-func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame, cs *clientStream) error {
+func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
+	cs := rl.cc.streamByID(f.StreamID, true)
 	if cs == nil {
-		// TODO: return error if rst of idle stream?
+		// TODO: return error if server tries to RST_STEAM an idle stream
 		return nil
 	}
 	select {
@@ -994,6 +977,17 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame, cs *clientSt
 	return nil
 }
 
+func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
+	// We told the peer we don't want them.
+	// Spec says:
+	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
+	// setting of the peer endpoint is set to 0. An endpoint that
+	// has set this setting and has received acknowledgement MUST
+	// treat the receipt of a PUSH_PROMISE frame as a connection
+	// error (Section 5.4.1) of type PROTOCOL_ERROR."
+	return ConnectionError(ErrCodeProtocol)
+}
+
 func (cc *clientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
 	// TODO: do something with err? send it as a debug frame to the peer?
 	// But that's only in GOAWAY. Invent a new frame type? Is there one already?