Browse Source

http2: send WINDOW_UPDATE frames while reading Transport Response bodies

Change-Id: I3cbdf6107dfe13a0fb57d677059ca2106b1afb25
Reviewed-on: https://go-review.googlesource.com/16334
Reviewed-by: Blake Mizerany <blake.mizerany@gmail.com>
Brad Fitzpatrick 10 years ago
parent
commit
a7d8d4e9cf
2 changed files with 51 additions and 8 deletions
  1. 7 0
      http2/http2.go
  2. 44 8
      http2/transport.go

+ 7 - 0
http2/http2.go

@@ -244,3 +244,10 @@ func (w *bufferedWriter) Flush() error {
 	w.bw = nil
 	return err
 }
+
+func mustUint31(v int32) uint32 {
+	if v < 0 || v > 2147483647 {
+		panic("out of range")
+	}
+	return uint32(v)
+}

+ 44 - 8
http2/transport.go

@@ -3,12 +3,6 @@
 // license that can be found in the LICENSE file.
 
 // Transport code.
-//
-// TODO: send flow control WINDOW_UPDATE frames as DATA is received,
-// for both stream and conn levels. Currently this code will hang
-// after 1GB is downloaded total.
-// TODO: buffer each stream's data (~few MB), flow controlled separately,
-// so one ignored Response.Body doesn't block all goroutines.
 
 package http2
 
@@ -39,6 +33,10 @@ const (
 	// control tokens we announce to the peer, and how many bytes
 	// we buffer per stream.
 	transportDefaultStreamFlow = 4 << 20
+
+	// transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
+	// a stream-level WINDOW_UPDATE for at a time.
+	transportDefaultStreamMinRefresh = 4 << 10
 )
 
 // Transport is an HTTP/2 Transport.
@@ -91,11 +89,13 @@ type clientConn struct {
 	werr error      // first write error that has occurred
 }
 
+// clientStream is the state for a single HTTP/2 stream. One of these
+// is created for each Transport.RoundTrip call.
 type clientStream struct {
 	cc      *clientConn
 	ID      uint32
 	resc    chan resAndError
-	bufPipe pipe
+	bufPipe pipe // buffered pipe with the flow-controlled response payload
 
 	// Owned by readLoop goroutine:
 	ended bool // on STREAM_ENDED from any of HEADERS/CONTINUATION/DATA
@@ -107,6 +107,8 @@ type clientStream struct {
 	resetErr  error         // populated before peerReset is closed
 }
 
+// checkReset reports any error sent in a RST_STREAM frame by the
+// server.
 func (cs *clientStream) checkReset() error {
 	select {
 	case <-cs.peerReset:
@@ -834,7 +836,40 @@ type transportResponseBody struct {
 }
 
 func (b transportResponseBody) Read(p []byte) (n int, err error) {
-	return b.cs.bufPipe.Read(p)
+	n, err = b.cs.bufPipe.Read(p)
+	if n == 0 {
+		return
+	}
+
+	cs := b.cs
+	cc := cs.cc
+	cc.mu.Lock()
+	defer cc.mu.Unlock()
+
+	var connAdd, streamAdd int32
+	// Check the conn-level first, before the stream-level.
+	if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
+		connAdd = transportDefaultConnFlow - v
+		cc.inflow.add(connAdd)
+	}
+	if err == nil { // No need to refresh if the stream is over or failed.
+		if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
+			streamAdd = transportDefaultStreamFlow - v
+			cs.inflow.add(streamAdd)
+		}
+	}
+	if connAdd != 0 || streamAdd != 0 {
+		cc.wmu.Lock()
+		defer cc.wmu.Unlock()
+		if connAdd != 0 {
+			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
+		}
+		if streamAdd != 0 {
+			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
+		}
+		cc.bw.Flush()
+	}
+	return
 }
 
 func (b transportResponseBody) Close() error {
@@ -961,6 +996,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame, cs *clientSt
 
 func (cc *clientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
 	// TODO: do something with err? send it as a debug frame to the peer?
+	// But that's only in GOAWAY. Invent a new frame type? Is there one already?
 	cc.wmu.Lock()
 	cc.fr.WriteRSTStream(streamID, code)
 	cc.wmu.Unlock()