Browse Source

Redesign how frames are written.

There's now three goroutines per connection, plus one goroutine per
active http.Handler. The base three are:

1) the "serve" loop coordiantor that owns most state about a conn and
   never blocks.
2) the readFrames loop, blocked reading from the network to read more
   frames from the peer.
3) the writeFrames loop (this one is new), which might also get blocked,
   writing faster than the peer can receive frames.

The http.Handler goroutines now tell the first goroutine (the serve
loop) that they would like a frame written. The serve goroutine acts
as the frame scheduler, deciding how to send frames based on
priorities. It can also stop reading from its channel, causing
pushback to the handlers if they're sending too much. But in practice
the handlers will do their own pushback, since each frame write
request includes a channel to be notified of when it's actually
written. Handlers writing response data, for instance, will wait until
their data is written (especially when they're out of flow control
tokens) before trying to send more.

For now the scheduler is naive, but the design feels right now.
Brad Fitzpatrick 11 years ago
parent
commit
520123b8fc
1 changed files with 172 additions and 77 deletions
  1. 172 77
      server.go

+ 172 - 77
server.go

@@ -78,9 +78,10 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
 		streams:           make(map[uint32]*stream),
 		canonHeader:       make(map[string]string),
 		readFrameCh:       make(chan frameAndProcessed),
-		readFrameErrCh:    make(chan error, 1),
-		writeHeaderCh:     make(chan headerWriteReq), // must not be buffered
-		windowUpdateCh:    make(chan windowUpdateReq, 8),
+		readFrameErrCh:    make(chan error, 1), // must be buffered for 1
+		wantWriteFrameCh:  make(chan frameWriteMsg, 8),
+		writeFrameCh:      make(chan frameWriteMsg, 1), // may be 0 or 1, but more is useless. (max 1 in flight)
+		wroteFrameCh:      make(chan struct{}, 1),
 		flow:              newFlow(initialWindowSize),
 		doneServing:       make(chan struct{}),
 		maxWriteFrameSize: initialMaxFrameSize,
@@ -103,29 +104,36 @@ type frameAndProcessed struct {
 
 type serverConn struct {
 	// Immutable:
-	hs             *http.Server
-	conn           net.Conn
-	handler        http.Handler
-	framer         *Framer
-	hpackDecoder   *hpack.Decoder
-	hpackEncoder   *hpack.Encoder
-	doneServing    chan struct{}          // closed when serverConn.serve ends
-	readFrameCh    chan frameAndProcessed // written by serverConn.readFrames
-	readFrameErrCh chan error
-	writeHeaderCh  chan headerWriteReq // must not be buffered
-	windowUpdateCh chan windowUpdateReq
-	serveG         goroutineLock // used to verify funcs are on serve()
-	flow           *flow         // the connection-wide one
-
-	// Everything following is owned by the serve loop; use serveG.check()
+	hs               *http.Server
+	conn             net.Conn
+	handler          http.Handler
+	framer           *Framer
+	hpackDecoder     *hpack.Decoder
+	doneServing      chan struct{}          // closed when serverConn.serve ends
+	readFrameCh      chan frameAndProcessed // written by serverConn.readFrames
+	readFrameErrCh   chan error
+	wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
+	writeFrameCh     chan frameWriteMsg // from serve -> writeFrames
+	wroteFrameCh     chan struct{}      // from writeFrames -> serve, tickles more sends on writeFrameCh
+
+	serveG goroutineLock // used to verify funcs are on serve()
+	writeG goroutineLock // used to verify things running on writeLoop
+	flow   *flow         // connection-wide (not stream-specific) flow control
+
+	// Everything following is owned by the serve loop; use serveG.check():
 	maxStreamID       uint32 // max ever seen
 	streams           map[uint32]*stream
 	maxWriteFrameSize uint32 // TODO: update this when settings come in
 	initialWindowSize int32
 	canonHeader       map[string]string // http2-lower-case -> Go-Canonical-Case
 	sentGoAway        bool
-	req               requestParam // non-zero while reading request headers
-	headerWriteBuf    bytes.Buffer // used to write response headers
+	req               requestParam    // non-zero while reading request headers
+	writingFrame      bool            // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
+	writeQueue        []frameWriteMsg // TODO: proper scheduler, not a queue
+
+	// Owned by the writeFrames goroutine; use writeG.check():
+	headerWriteBuf bytes.Buffer
+	hpackEncoder   *hpack.Encoder
 }
 
 // requestParam is the state of the next request, initialized over
@@ -265,8 +273,8 @@ func (sc *serverConn) readFrames() {
 	for {
 		f, err := sc.framer.ReadFrame()
 		if err != nil {
+			sc.readFrameErrCh <- err // BEFORE the close
 			close(sc.readFrameCh)
-			sc.readFrameErrCh <- err
 			return
 		}
 		sc.readFrameCh <- frameAndProcessed{f, processed}
@@ -274,6 +282,20 @@ func (sc *serverConn) readFrames() {
 	}
 }
 
+// writeFrames is the loop that writes frames to the peer
+// and is responsible for prioritization and buffering.
+// It's run on its own goroutine.
+func (sc *serverConn) writeFrames() {
+	sc.writeG = newGoroutineLock()
+	for wm := range sc.writeFrameCh {
+		err := wm.write(sc, wm.v)
+		if ch := wm.done; ch != nil {
+			ch <- err
+		}
+		sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
+	}
+}
+
 func (sc *serverConn) serve() {
 	sc.serveG.check()
 	defer sc.conn.Close()
@@ -294,7 +316,7 @@ func (sc *serverConn) serve() {
 	}
 	sc.vlogf("client %v said hello", sc.conn.RemoteAddr())
 
-	f, err := sc.framer.ReadFrame()
+	f, err := sc.framer.ReadFrame() // TODO: timeout
 	if err != nil {
 		sc.logf("error reading initial frame from client: %v", err)
 		return
@@ -325,20 +347,17 @@ func (sc *serverConn) serve() {
 		return
 	}
 
-	go sc.readFrames()
+	go sc.readFrames() // closed by defer sc.conn.Close above
+	go sc.writeFrames()
+	defer close(sc.writeFrameCh) // shuts down writeFrames loop
 
 	for {
 		select {
-		case hr := <-sc.writeHeaderCh:
-			if err := sc.writeHeaderInLoop(hr); err != nil {
-				sc.condlogf(err, "error writing response header: %v", err)
-				return
-			}
-		case wu := <-sc.windowUpdateCh:
-			if err := sc.sendWindowUpdateInLoop(wu); err != nil {
-				sc.condlogf(err, "error writing window update: %v", err)
-				return
-			}
+		case wm := <-sc.wantWriteFrameCh:
+			sc.enqueueFrameWrite(wm)
+		case <-sc.wroteFrameCh:
+			sc.writingFrame = false
+			sc.scheduleFrameWrite()
 		case fp, ok := <-sc.readFrameCh:
 			if !ok {
 				err := <-sc.readFrameErrCh
@@ -378,6 +397,30 @@ func (sc *serverConn) serve() {
 	}
 }
 
+func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
+	sc.serveG.check()
+	// Fast path for common case:
+	if !sc.writingFrame {
+		sc.writingFrame = true
+		sc.writeFrameCh <- wm
+		return
+	}
+	sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
+}
+
+func (sc *serverConn) scheduleFrameWrite() {
+	sc.serveG.check()
+	if len(sc.writeQueue) == 0 {
+		// TODO: flush Framer's underlying buffered writer, once that's added
+		return
+	}
+	// TODO: proper scheduler
+	wm := sc.writeQueue[0]
+	copy(sc.writeQueue, sc.writeQueue[1:]) // shift it all down. kinda lame. will be removed later anyway.
+	sc.writingFrame = true
+	sc.writeFrameCh <- wm
+}
+
 func (sc *serverConn) goAway(code ErrCode) error {
 	sc.serveG.check()
 	sc.sentGoAway = true
@@ -693,11 +736,14 @@ func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, err
 	}
 
 	rws := responseWriterStatePool.Get().(*responseWriterState)
+	wbufSave := rws.wbuf
+	*rws = responseWriterState{} // zero all the fields
+	rws.wbuf = wbufSave
+	rws.wbuf.Reset()
 	rws.sc = sc
 	rws.streamID = rp.stream.id
 	rws.req = req
 	rws.body = body
-	rws.wbuf.Reset()
 
 	rw := &responseWriter{rws: rws}
 	return rw, req, nil
@@ -705,7 +751,9 @@ func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, err
 
 var responseWriterStatePool = sync.Pool{
 	New: func() interface{} {
-		return new(responseWriterState)
+		return &responseWriterState{
+			wbuf: new(bytes.Buffer),
+		}
 	},
 }
 
@@ -723,22 +771,45 @@ func (sc *serverConn) writeData(streamID uint32, p []byte) (n int, err error) {
 	return len(p), nil
 }
 
+type frameWriteMsg struct {
+	// write runs on the writeFrames goroutine.
+	write func(sc *serverConn, v interface{}) error
+
+	v        interface{} // passed to write
+	cost     uint32      // number of flow control bytes required
+	streamID uint32      // used for prioritization
+
+	// done, if non-nil, must be a buffered channel with space for
+	// 1 message and is sent the return value from write (or an
+	// earlier error) when the frame has been written.
+	done chan<- error
+}
+
 // headerWriteReq is a request to write an HTTP response header from a server Handler.
 type headerWriteReq struct {
 	streamID    uint32
 	httpResCode int
 	h           http.Header // may be nil
 	endStream   bool
+
+	contentType   string
+	contentLength string
 }
 
 // called from handler goroutines.
 // h may be nil.
 func (sc *serverConn) writeHeader(req headerWriteReq) {
-	sc.writeHeaderCh <- req
+	sc.wantWriteFrameCh <- frameWriteMsg{
+		write:    (*serverConn).writeHeaderInLoop,
+		v:        req,
+		streamID: req.streamID,
+	}
 }
 
-func (sc *serverConn) writeHeaderInLoop(req headerWriteReq) error {
-	sc.serveG.check()
+func (sc *serverConn) writeHeaderInLoop(v interface{}) error {
+	sc.writeG.check()
+	req := v.(headerWriteReq)
+
 	sc.headerWriteBuf.Reset()
 	sc.hpackEncoder.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(req.httpResCode)})
 	for k, vv := range req.h {
@@ -749,6 +820,13 @@ func (sc *serverConn) writeHeaderInLoop(req headerWriteReq) error {
 			sc.hpackEncoder.WriteField(hpack.HeaderField{Name: strings.ToLower(k), Value: v})
 		}
 	}
+	if req.contentType != "" {
+		sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-type", Value: req.contentType})
+	}
+	if req.contentLength != "" {
+		sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-length", Value: req.contentLength})
+	}
+
 	headerBlock := sc.headerWriteBuf.Bytes()
 	if len(headerBlock) > int(sc.maxWriteFrameSize) {
 		// we'll need continuation ones.
@@ -771,24 +849,31 @@ type windowUpdateReq struct {
 func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) {
 	const maxUint32 = 2147483647
 	for n >= maxUint32 {
-		sc.windowUpdateCh <- windowUpdateReq{streamID, maxUint32}
+		sc.wantWriteFrameCh <- frameWriteMsg{
+			write:    (*serverConn).sendWindowUpdateInLoop,
+			v:        windowUpdateReq{streamID, maxUint32},
+			streamID: streamID,
+		}
 		n -= maxUint32
 	}
 	if n > 0 {
-		sc.windowUpdateCh <- windowUpdateReq{streamID, uint32(n)}
+		sc.wantWriteFrameCh <- frameWriteMsg{
+			write:    (*serverConn).sendWindowUpdateInLoop,
+			v:        windowUpdateReq{streamID, uint32(n)},
+			streamID: streamID,
+		}
 	}
 }
 
-func (sc *serverConn) sendWindowUpdateInLoop(wu windowUpdateReq) error {
-	sc.serveG.check()
-	// TODO: sc.bufferedOutput.StartBuffering()
+func (sc *serverConn) sendWindowUpdateInLoop(v interface{}) error {
+	sc.writeG.check()
+	wu := v.(windowUpdateReq)
 	if err := sc.framer.WriteWindowUpdate(0, wu.n); err != nil {
 		return err
 	}
 	if err := sc.framer.WriteWindowUpdate(wu.streamID, wu.n); err != nil {
 		return err
 	}
-	// TODO: return sc.bufferedOutput.Flush()
 	return nil
 }
 
@@ -838,13 +923,17 @@ type responseWriterState struct {
 	req      *http.Request
 	body     *requestBody // to close at end of request, if DATA frames didn't
 
-	wbuf bytes.Buffer
+	// TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
+	wbuf *bytes.Buffer
 
 	// mutated by http.Handler goroutine:
-	h               http.Header // h goes from maybe-nil to non-nil; contents changed by http.Handler goroutine
-	wroteHeaders    bool        // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
-	writeHeaderCode int
-	calledHeader    bool
+	h             http.Header // h goes from maybe-nil to non-nil; contents changed by http.Handler goroutine
+	wroteHeader   bool        // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
+	status        int         // status code passed to WriteHeader
+	wroteContinue bool        // 100 Continue response was written
+	calledHeader  bool
+	sentHeader    bool // have we sent the header frame?
+	handlerDone   bool // handler has finished.
 }
 
 // Optional http.ResponseWriter interfaces implemented.
@@ -855,12 +944,33 @@ var (
 )
 
 func (w *responseWriter) Flush() {
-	// TODO: implement
-}
+	rws := w.rws
+	if rws == nil {
+		panic("Header called after Handler finished")
+	}
+	if !rws.wroteHeader {
+		w.WriteHeader(200)
+	}
+	if !rws.sentHeader {
+		rws.sentHeader = true
+		var ctype, clen string // implicit ones, if we can calculate it
+		if rws.handlerDone && rws.h.Get("Content-Length") == "" {
+			clen = strconv.Itoa(rws.wbuf.Len())
+		}
+		if rws.h.Get("Content-Type") == "" {
+			ctype = http.DetectContentType(rws.wbuf.Bytes())
+		}
+		rws.sc.writeHeader(headerWriteReq{
+			streamID:      rws.streamID,
+			httpResCode:   rws.status,
+			h:             rws.h,
+			endStream:     rws.wbuf.Len() == 0,
+			contentType:   ctype,
+			contentLength: clen,
+		})
+	}
 
-// TODO: bufio writing of responseWriter. add Flush, add pools of
-// bufio.Writers, adjust bufio writer sized based on frame size
-// updates from peer? For now: naive.
+}
 
 func (w *responseWriter) Header() http.Header {
 	rws := w.rws
@@ -879,21 +989,11 @@ func (w *responseWriter) WriteHeader(code int) {
 	if rws == nil {
 		panic("WriteHeader called after Handler finished")
 	}
-	if rws.wroteHeaders {
+	if rws.wroteHeader {
 		return
 	}
-	rws.wroteHeaders = true
-	rws.writeHeaderCode = code
-	// TODO: defer actually writing this frame until a Flush or
-	// handlerDone, like net/http's Server. then we can coalesce
-	// e.g. a 204 response to have a Header response frame with
-	// END_STREAM set, without a separate frame being sent in
-	// handleDone.
-	rws.sc.writeHeader(headerWriteReq{
-		streamID:    rws.streamID,
-		httpResCode: code,
-		h:           rws.h,
-	})
+	rws.wroteHeader = true
+	rws.status = code
 }
 
 // The Life Of A Write is like this:
@@ -913,7 +1013,7 @@ func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int,
 	if rws == nil {
 		panic("Write called after Handler finished")
 	}
-	if !rws.wroteHeaders {
+	if !rws.wroteHeader {
 		w.WriteHeader(200)
 	}
 	// TODO: write to a bufio.Writer instead like the
@@ -930,14 +1030,9 @@ func (w *responseWriter) handlerDone() {
 	if rws == nil {
 		panic("handlerDone called twice")
 	}
+	rws.handlerDone = true
+	w.Flush()
+
 	w.rws = nil
-	if !rws.wroteHeaders {
-		rws.sc.writeHeader(headerWriteReq{
-			streamID:    rws.streamID,
-			httpResCode: 200,
-			h:           rws.h,
-			endStream:   true, // handler has finished; can't be any data.
-		})
-	}
-	// TODO: recycle rws back into a pool
+	responseWriterStatePool.Put(rws)
 }