Browse Source

A basic working responseWriter. Initial tests now pass.

Brad Fitzpatrick 11 years ago
parent
commit
b3648114a0
2 changed files with 157 additions and 19 deletions
  1. 141 16
      http2.go
  2. 16 3
      http2_test.go

+ 141 - 16
http2.go

@@ -23,7 +23,6 @@ import (
 	"io"
 	"log"
 	"net/http"
-	"net/http/httptest"
 	"net/url"
 	"strconv"
 	"strings"
@@ -35,6 +34,10 @@ const (
 	// ClientPreface is the string that must be sent by new
 	// connections from clients.
 	ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+
+	// SETTINGS_MAX_FRAME_SIZE default
+	// http://http2.github.io/http2-spec/#rfc.section.6.5.2
+	initialMaxFrameSize = 16384
 )
 
 var (
@@ -56,16 +59,19 @@ type Server struct {
 
 func (srv *Server) handleConn(hs *http.Server, c *tls.Conn, h http.Handler) {
 	sc := &serverConn{
-		hs:             hs,
-		conn:           c,
-		handler:        h,
-		framer:         NewFramer(c, c),
-		streams:        make(map[uint32]*stream),
-		canonHeader:    make(map[string]string),
-		readFrameCh:    make(chan frameAndProcessed),
-		readFrameErrCh: make(chan error, 1),
-		doneServing:    make(chan struct{}),
+		hs:                hs,
+		conn:              c,
+		handler:           h,
+		framer:            NewFramer(c, c),
+		streams:           make(map[uint32]*stream),
+		canonHeader:       make(map[string]string),
+		readFrameCh:       make(chan frameAndProcessed),
+		readFrameErrCh:    make(chan error, 1),
+		writeHeaderCh:     make(chan headerWriteReq), // must not be buffered
+		doneServing:       make(chan struct{}),
+		maxWriteFrameSize: initialMaxFrameSize,
 	}
+	sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
 	sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
 	sc.serve()
 }
@@ -87,16 +93,23 @@ type serverConn struct {
 	doneServing    chan struct{}          // closed when serverConn.serve ends
 	readFrameCh    chan frameAndProcessed // written by serverConn.readFrames
 	readFrameErrCh chan error
+	writeHeaderCh  chan headerWriteReq // must not be buffered
 
 	maxStreamID uint32 // max ever seen
 	streams     map[uint32]*stream
 
+	maxWriteFrameSize uint32 // TODO: update this when settings come in
+
 	// State related to parsing current headers:
-	hpackDecoder *hpack.Decoder
-	header       http.Header
-	canonHeader  map[string]string // http2-lower-case -> Go-Canonical-Case
+	hpackDecoder      *hpack.Decoder
+	header            http.Header
+	canonHeader       map[string]string // http2-lower-case -> Go-Canonical-Case
+	method, path      string
+	scheme, authority string
 
-	method, path, scheme, authority string
+	// State related to writing current headers:
+	hpackEncoder   *hpack.Encoder
+	headerWriteBuf bytes.Buffer
 
 	// curHeaderStreamID is non-zero if we're in the middle
 	// of parsing headers that span multiple frames.
@@ -213,6 +226,12 @@ func (sc *serverConn) serve() {
 
 	for {
 		select {
+		case hr := <-sc.writeHeaderCh:
+			if err := sc.writeHeaderInLoop(hr); err != nil {
+				// TODO: diff error handling?
+				sc.logf("error writing response header: %v", err)
+				return
+			}
 		case fp, ok := <-sc.readFrameCh:
 			if !ok {
 				err := <-sc.readFrameErrCh
@@ -354,9 +373,60 @@ func (sc *serverConn) startHandler(streamID uint32, bodyOpen bool, method, path,
 	} else {
 		req.ContentLength = -1
 	}
-	var rw = httptest.NewRecorder()
+	rw := &responseWriter{
+		sc:       sc,
+		streamID: streamID,
+	}
+	defer rw.handlerDone()
+	// TODO: catch panics like net/http.Server
 	sc.handler.ServeHTTP(rw, req)
-	log.Printf("Got code %d: body: %q\n", rw.Code, rw.Body.Bytes())
+}
+
+// called from handler goroutines
+func (sc *serverConn) writeData(streamID uint32, p []byte) (n int, err error) {
+	// TODO: implement
+	log.Printf("WRITE on %d: %q", streamID, p)
+	return len(p), nil
+}
+
+// headerWriteReq is a request to write an HTTP response header from a server Handler.
+type headerWriteReq struct {
+	streamID    uint32
+	httpResCode int
+	h           http.Header // may be nil
+	endStream   bool
+}
+
+// called from handler goroutines.
+// h may be nil.
+func (sc *serverConn) writeHeader(req headerWriteReq) {
+	sc.writeHeaderCh <- req
+}
+
+// called from serverConn.serve loop.
+func (sc *serverConn) writeHeaderInLoop(req headerWriteReq) error {
+	sc.headerWriteBuf.Reset()
+	// TODO: remove this strconv
+	sc.hpackEncoder.WriteField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(req.httpResCode)})
+	for k, vv := range req.h {
+		for _, v := range vv {
+			// TODO: for gargage, cache lowercase copies of headers at
+			// least for common ones and/or popular recent ones for
+			// this serverConn. LRU?
+			sc.hpackEncoder.WriteField(hpack.HeaderField{Name: strings.ToLower(k), Value: v})
+		}
+	}
+	headerBlock := sc.headerWriteBuf.Bytes()
+	if len(headerBlock) > int(sc.maxWriteFrameSize) {
+		// we'll need continuation ones.
+		panic("TODO")
+	}
+	return sc.framer.WriteHeaders(HeadersFrameParam{
+		StreamID:      req.streamID,
+		BlockFragment: headerBlock,
+		EndStream:     req.endStream,
+		EndHeaders:    true, // no continuation yet
+	})
 }
 
 // ConfigureServer adds HTTP/2 support to a net/http Server.
@@ -413,4 +483,59 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
 	return 0, errors.New("TODO: we don't handle request bodies yet")
 }
 
+type responseWriter struct {
+	sc           *serverConn
+	streamID     uint32
+	wroteHeaders bool
+	h            http.Header
+}
+
+// TODO: bufio writing of responseWriter. add Flush, add pools of
+// bufio.Writers, adjust bufio writer sized based on frame size
+// updates from peer? For now: naive.
+
+func (w *responseWriter) Header() http.Header {
+	if w.h == nil {
+		w.h = make(http.Header)
+	}
+	return w.h
+}
+
+func (w *responseWriter) WriteHeader(code int) {
+	if w.wroteHeaders {
+		return
+	}
+	// TODO: defer actually writing this frame until a Flush or
+	// handlerDone, like net/http's Server. then we can coalesce
+	// e.g. a 204 response to have a Header response frame with
+	// END_STREAM set, without a separate frame being sent in
+	// handleDone.
+	w.wroteHeaders = true
+	w.sc.writeHeader(headerWriteReq{
+		streamID:    w.streamID,
+		httpResCode: code,
+		h:           w.h,
+	})
+}
+
+// TODO: responseWriter.WriteString too?
+
+func (w *responseWriter) Write(p []byte) (n int, err error) {
+	if !w.wroteHeaders {
+		w.WriteHeader(200)
+	}
+	return w.sc.writeData(w.streamID, p) // blocks waiting for tokens
+}
+
+func (w *responseWriter) handlerDone() {
+	if !w.wroteHeaders {
+		w.sc.writeHeader(headerWriteReq{
+			streamID:    w.streamID,
+			httpResCode: 200,
+			h:           w.h,
+			endStream:   true, // handler has finished; can't be any data.
+		})
+	}
+}
+
 var testHookOnConn func() // for testing

+ 16 - 3
http2_test.go

@@ -10,7 +10,6 @@ package http2
 import (
 	"errors"
 	"fmt"
-	"io"
 	"net/http"
 	"net/http/httptest"
 	"os/exec"
@@ -25,7 +24,18 @@ func TestServer(t *testing.T) {
 	requireCurl(t)
 
 	ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		io.WriteString(w, "Hello, test.")
+		// TODO: add a bunch of different tests with different
+		// behavior, as a function of r or a table.
+		// -- with request body, without.
+		// -- no interaction with w.
+		// -- panic
+		// -- modify Header only, but no writes or writeheader (this test)
+		// -- WriteHeader only
+		// -- Write only
+		// -- WriteString
+		// -- both
+		// Look at net/http's Server tests for inspiration.
+		w.Header().Set("Foo", "Bar")
 	}))
 	ConfigureServer(ts.Config, &Server{})
 	ts.TLS = ts.Config.TLSConfig // the httptest.Server has its own copy of this TLS config
@@ -52,7 +62,10 @@ func TestServer(t *testing.T) {
 		if err, ok := res.(error); ok {
 			t.Fatal(err)
 		}
-		t.Logf("Got: %s", res)
+		if !strings.Contains(string(res.([]byte)), "< foo:Bar") {
+			t.Errorf("didn't see foo:Bar header")
+			t.Logf("Got: %s", res)
+		}
 	case <-time.After(3 * time.Second):
 		t.Errorf("timeout waiting for curl")
 	}