Browse Source

Move server's WINDOW_UPDATE sending logic into the serve loop.

Previously the http.Handler (via requestBody) were just blinding
writing telling the serve loop to send the WINDOW_UPDATE frames, but
the serve loop didn't really know what was going on.

Now the requestBody.Read instead tells the serve loop that n bytes
were read on a given stream, so the server can be smarter.

This opens the door to coalescing updates, and suppressing pointless
ones (like the new TODO notes: no need to send a stream-specific
WINDOW_UPDATE when the peer has closed their side already)

No (intentional) changes in behavior in this change, though.
Brad Fitzpatrick 11 years ago
parent
commit
c8bab6aad6
2 changed files with 45 additions and 20 deletions
  1. 43 11
      server.go
  2. 2 9
      write.go

+ 43 - 11
server.go

@@ -166,6 +166,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
 		readFrameErrCh:   make(chan error, 1), // must be buffered for 1
 		readFrameErrCh:   make(chan error, 1), // must be buffered for 1
 		wantWriteFrameCh: make(chan frameWriteMsg, 8),
 		wantWriteFrameCh: make(chan frameWriteMsg, 8),
 		wroteFrameCh:     make(chan struct{}, 1), // buffered; one send in reading goroutine
 		wroteFrameCh:     make(chan struct{}, 1), // buffered; one send in reading goroutine
+		bodyReadCh:       make(chan bodyReadMsg), // buffering doesn't matter either way
 		doneServing:      make(chan struct{}),
 		doneServing:      make(chan struct{}),
 		advMaxStreams:    srv.maxConcurrentStreams(),
 		advMaxStreams:    srv.maxConcurrentStreams(),
 		writeSched: writeScheduler{
 		writeSched: writeScheduler{
@@ -214,6 +215,7 @@ type serverConn struct {
 	readFrameErrCh   chan error
 	readFrameErrCh   chan error
 	wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
 	wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
 	wroteFrameCh     chan struct{}      // from writeFrameAsync -> serve, tickles more frame writes
 	wroteFrameCh     chan struct{}      // from writeFrameAsync -> serve, tickles more frame writes
+	bodyReadCh       chan bodyReadMsg   // from handlers -> serve
 	testHookCh       chan func()        // code to run on the serve loop
 	testHookCh       chan func()        // code to run on the serve loop
 	flow             flow               // connection-wide (not stream-specific) flow control
 	flow             flow               // connection-wide (not stream-specific) flow control
 
 
@@ -504,6 +506,8 @@ func (sc *serverConn) serve() {
 				settingsTimer.Stop()
 				settingsTimer.Stop()
 				settingsTimer.C = nil
 				settingsTimer.C = nil
 			}
 			}
+		case m := <-sc.bodyReadCh:
+			sc.noteBodyRead(m.st, m.n)
 		case <-settingsTimer.C:
 		case <-settingsTimer.C:
 			sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
 			sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
 			return
 			return
@@ -1307,22 +1311,50 @@ func (sc *serverConn) write100ContinueHeaders(st *stream) {
 	})
 	})
 }
 }
 
 
-// called from handler goroutines
+// A bodyReadMsg tells the server loop that the http.Handler read n
+// bytes of the DATA from the client on the given stream.
+type bodyReadMsg struct {
+	st *stream
+	n  int
+}
+
+// called from handler goroutines.
+// Notes that the handler for the given stream ID read n bytes of its body
+// and schedules flow control tokens to be sent.
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
+	sc.serveG.checkNotOn() // NOT on
+	sc.bodyReadCh <- bodyReadMsg{st, n}
+}
+
+func (sc *serverConn) noteBodyRead(st *stream, n int) {
+	sc.serveG.check()
+	sc.sendWindowUpdate(nil, n) // conn-level
+	// TODO: don't send this WINDOW_UPDATE if the stream is in
+	// stateClosedRemote.  No need to tell them they can send more
+	// if they've already said they're done.
+	sc.sendWindowUpdate(st, n)
+}
+
+// st may be nil for conn-level
 func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
 func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
-	if st == nil {
-		panic("no stream")
+	sc.serveG.check()
+	// "The legal range for the increment to the flow control
+	// window is 1 to 2^31-1 (2,147,483,647) octets."
+	var streamID uint32
+	if st != nil {
+		streamID = st.id
 	}
 	}
-	const maxUint32 = 2147483647
-	for n >= maxUint32 {
-		sc.writeFrameFromHandler(frameWriteMsg{
-			write:  writeWindowUpdate{streamID: st.id, n: maxUint32},
+	const maxUint31 = 1<<31 - 1
+	for n >= maxUint31 {
+		sc.writeFrame(frameWriteMsg{
+			write:  writeWindowUpdate{streamID: streamID, n: maxUint31},
 			stream: st,
 			stream: st,
 		})
 		})
-		n -= maxUint32
+		n -= maxUint31
 	}
 	}
 	if n > 0 {
 	if n > 0 {
-		sc.writeFrameFromHandler(frameWriteMsg{
-			write:  writeWindowUpdate{streamID: st.id, n: uint32(n)},
+		sc.writeFrame(frameWriteMsg{
+			write:  writeWindowUpdate{streamID: streamID, n: uint32(n)},
 			stream: st,
 			stream: st,
 		})
 		})
 	}
 	}
@@ -1354,7 +1386,7 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
 	}
 	}
 	n, err = b.pipe.Read(p)
 	n, err = b.pipe.Read(p)
 	if n > 0 {
 	if n > 0 {
-		b.conn.sendWindowUpdate(b.stream, n)
+		b.conn.noteBodyReadFromHandler(b.stream, n)
 	}
 	}
 	return
 	return
 }
 }

+ 2 - 9
write.go

@@ -195,17 +195,10 @@ func (w write100ContinueHeadersFrame) writeFrame(ctx writeContext) error {
 }
 }
 
 
 type writeWindowUpdate struct {
 type writeWindowUpdate struct {
-	streamID uint32
+	streamID uint32 // or 0 for conn-level
 	n        uint32
 	n        uint32
 }
 }
 
 
 func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
 func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
-	fr := ctx.Framer()
-	if err := fr.WriteWindowUpdate(0, wu.n); err != nil {
-		return err
-	}
-	if err := fr.WriteWindowUpdate(wu.streamID, wu.n); err != nil {
-		return err
-	}
-	return nil
+	return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n)
 }
 }