Selaa lähdekoodia

http2: add per-Response buffered response bodies with separate flow control

Change-Id: I795d230b78b43aea4c2088b1d04c927b2418a7a3
Reviewed-on: https://go-review.googlesource.com/16333
Reviewed-by: Blake Mizerany <blake.mizerany@gmail.com>
Brad Fitzpatrick 10 vuotta sitten
vanhempi
commit
6281f06c8c
2 muutettua tiedostoa jossa 54 lisäystä ja 15 poistoa
  1. 8 0
      http2/pipe.go
  2. 46 15
      http2/transport.go

+ 8 - 0
http2/pipe.go

@@ -80,3 +80,11 @@ func (p *pipe) CloseWithError(err error) {
 		p.err = err
 		p.err = err
 	}
 	}
 }
 }
+
+// Err returns the error (if any) first set with CloseWithError.
+// This is the error which will be returned after the reader is exhausted.
+func (p *pipe) Err() error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	return p.err
+}

+ 46 - 15
http2/transport.go

@@ -19,6 +19,7 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"io/ioutil"
 	"log"
 	"log"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
@@ -91,11 +92,13 @@ type clientConn struct {
 }
 }
 
 
 type clientStream struct {
 type clientStream struct {
-	cc   *clientConn
-	ID   uint32
-	resc chan resAndError
-	pw   *io.PipeWriter
-	pr   *io.PipeReader
+	cc      *clientConn
+	ID      uint32
+	resc    chan resAndError
+	bufPipe pipe
+
+	// Owned by readLoop goroutine:
+	ended bool // on STREAM_ENDED from any of HEADERS/CONTINUATION/DATA
 
 
 	flow   flow // guarded by cc.mu
 	flow   flow // guarded by cc.mu
 	inflow flow // guarded by cc.mu
 	inflow flow // guarded by cc.mu
@@ -685,7 +688,7 @@ func (rl *clientConnReadLoop) cleanup() {
 		err = io.ErrUnexpectedEOF
 		err = io.ErrUnexpectedEOF
 	}
 	}
 	for _, cs := range rl.activeRes {
 	for _, cs := range rl.activeRes {
-		cs.pw.CloseWithError(err)
+		cs.bufPipe.CloseWithError(err)
 	}
 	}
 
 
 	cc.mu.Lock()
 	cc.mu.Lock()
@@ -779,7 +782,6 @@ func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame, cs *clientStream)
 		ProtoMajor: 2,
 		ProtoMajor: 2,
 		Header:     make(http.Header),
 		Header:     make(http.Header),
 	}
 	}
-	cs.pr, cs.pw = io.Pipe()
 	return rl.processHeaderBlockFragment(cs, f.HeaderBlockFragment(), f.HeadersEnded(), f.StreamEnded())
 	return rl.processHeaderBlockFragment(cs, f.HeaderBlockFragment(), f.HeadersEnded(), f.StreamEnded())
 }
 }
 
 
@@ -809,23 +811,49 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(cs *clientStream, frag
 		return nil
 		return nil
 	}
 	}
 
 
-	// TODO: set the Body to one which notes the
-	// Close and also sends the server a
-	// RST_STREAM
-	rl.nextRes.Body = cs.pr
 	res := rl.nextRes
 	res := rl.nextRes
+	if streamEnded {
+		res.Body = noBody
+		cs.ended = true
+	} else {
+		buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
+		cs.bufPipe = pipe{b: buf}
+		res.Body = transportResponseBody{cs}
+	}
 	rl.activeRes[cs.ID] = cs
 	rl.activeRes[cs.ID] = cs
 	cs.resc <- resAndError{res: res}
 	cs.resc <- resAndError{res: res}
 	rl.nextRes = nil // unused now; will be reset next HEADERS frame
 	rl.nextRes = nil // unused now; will be reset next HEADERS frame
 	return nil
 	return nil
 }
 }
 
 
+// transportResponseBody is the concrete type of Transport.RoundTrip's
+// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
+// On Close it sends RST_STREAM if EOF wasn't already seen.
+type transportResponseBody struct {
+	cs *clientStream
+}
+
+func (b transportResponseBody) Read(p []byte) (n int, err error) {
+	return b.cs.bufPipe.Read(p)
+}
+
+func (b transportResponseBody) Close() error {
+	if b.cs.bufPipe.Err() != io.EOF {
+		// TODO: write test for this
+		b.cs.cc.writeStreamReset(b.cs.ID, ErrCodeCancel, nil)
+	}
+	return nil
+}
+
 func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error {
 func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error {
 	if cs == nil {
 	if cs == nil {
 		return nil
 		return nil
 	}
 	}
+	if cs.ended {
+		// TODO: add test for this (DATA frame after STREAM_ENDED cases)
+		return ConnectionError(ErrCodeProtocol)
+	}
 	data := f.Data()
 	data := f.Data()
-	// TODO: decrement cs.inflow and cc.inflow, sending errors as appropriate.
 	if VerboseLogs {
 	if VerboseLogs {
 		rl.cc.logf("DATA: %q", data)
 		rl.cc.logf("DATA: %q", data)
 	}
 	}
@@ -841,13 +869,14 @@ func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error
 	}
 	}
 	cc.mu.Unlock()
 	cc.mu.Unlock()
 
 
-	if _, err := cs.pw.Write(data); err != nil {
+	if _, err := cs.bufPipe.Write(data); err != nil {
 		return err
 		return err
 	}
 	}
 	// send WINDOW_UPDATE frames occasionally as per-stream flow control depletes
 	// send WINDOW_UPDATE frames occasionally as per-stream flow control depletes
 
 
 	if f.StreamEnded() {
 	if f.StreamEnded() {
-		cs.pw.Close()
+		cs.ended = true
+		cs.bufPipe.CloseWithError(io.EOF)
 		delete(rl.activeRes, cs.ID)
 		delete(rl.activeRes, cs.ID)
 	}
 	}
 	return nil
 	return nil
@@ -924,7 +953,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame, cs *clientSt
 		err := StreamError{cs.ID, f.ErrCode}
 		err := StreamError{cs.ID, f.ErrCode}
 		cs.resetErr = err
 		cs.resetErr = err
 		close(cs.peerReset)
 		close(cs.peerReset)
-		cs.pw.CloseWithError(err)
+		cs.bufPipe.CloseWithError(err)
 	}
 	}
 	delete(rl.activeRes, cs.ID)
 	delete(rl.activeRes, cs.ID)
 	return nil
 	return nil
@@ -988,3 +1017,5 @@ func (t *Transport) vlogf(format string, args ...interface{}) {
 func (t *Transport) logf(format string, args ...interface{}) {
 func (t *Transport) logf(format string, args ...interface{}) {
 	log.Printf(format, args...)
 	log.Printf(format, args...)
 }
 }
+
+var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))