Browse Source

Respect peer's connection-level flow control

Previously connection-level flow control is not respected at all.
Only stream-level one is used.  Also there are 3 bugs with regard to
flow control.  First one is that flow window is not drained when
allowed quota is greater than or equal to the number of bytes to
write.  Second one is that 0 length DATA frame cannot be sent when
allowed quota == 0.  Third one is that if DATA frmae is processed in
"fast path", flow control is not ignored.  This commit adds
connection-flow control enforcement and fixes these bugs.
Tatsuhiro Tsujikawa 11 years ago
parent
commit
cc1e1da51b
3 changed files with 18 additions and 4 deletions
  1. 7 1
      server.go
  2. 5 1
      server_test.go
  3. 6 2
      writesched.go

+ 7 - 1
server.go

@@ -591,11 +591,15 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 	sc.serveG.check()
 	sc.serveG.check()
 	// Fast path for common case:
 	// Fast path for common case:
-	if !sc.writingFrame && sc.writeSched.empty() {
+	if _, ok := wm.write.(*writeData); !ok && !sc.writingFrame && sc.writeSched.empty() {
 		sc.startFrameWrite(wm)
 		sc.startFrameWrite(wm)
 		return
 		return
 	}
 	}
 	sc.writeSched.add(wm)
 	sc.writeSched.add(wm)
+	// Sometimes no sc.scheduleFrameWrite() is called after we
+	// added wm to sc.writeSched and frames are completely
+	// blocked. To prevent this happing, we call it here.
+	sc.scheduleFrameWrite()
 }
 }
 
 
 // startFrameWrite starts a goroutine to write wm (in a separate
 // startFrameWrite starts a goroutine to write wm (in a separate
@@ -1054,6 +1058,8 @@ func (sc *serverConn) processHeaders(f *HeadersFrame) error {
 		id:    id,
 		id:    id,
 		state: stateOpen,
 		state: stateOpen,
 	}
 	}
+	// connection-level flow control is shared by all streams.
+	st.flow.conn = &sc.flow
 	st.flow.add(sc.initialWindowSize)
 	st.flow.add(sc.initialWindowSize)
 	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	if f.StreamEnded() {
 	if f.StreamEnded() {

+ 5 - 1
server_test.go

@@ -1408,7 +1408,11 @@ func TestServer_Response_LargeWrite(t *testing.T) {
 		if err := st.fr.WriteWindowUpdate(1, size); err != nil {
 		if err := st.fr.WriteWindowUpdate(1, size); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-
+		// Give the handler quote to write to connection-level
+		// window as well
+		if err := st.fr.WriteWindowUpdate(0, size); err != nil {
+			t.Fatal(err)
+		}
 		hf := st.wantHeaders()
 		hf := st.wantHeaders()
 		if hf.StreamEnded() {
 		if hf.StreamEnded() {
 			t.Fatal("unexpected END_STREAM flag")
 			t.Fatal("unexpected END_STREAM flag")

+ 6 - 2
writesched.go

@@ -145,14 +145,17 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 	// and we don't have enough, write as much as we can.
 	// and we don't have enough, write as much as we can.
 	if wd, ok := wm.write.(*writeData); ok {
 	if wd, ok := wm.write.(*writeData); ok {
 		allowed := wm.stream.flow.available() // max we can write
 		allowed := wm.stream.flow.available() // max we can write
-		if allowed == 0 {
+		// We can write 0 byte DATA frame (which usually bears
+		// END_STREAM, i.e., last DATA frame) even if allowed
+		// == 0.
+		if len(wd.p) > 0 && allowed == 0 {
 			// No quota available. Caller can try the next stream.
 			// No quota available. Caller can try the next stream.
 			return frameWriteMsg{}, false
 			return frameWriteMsg{}, false
 		}
 		}
 		if int32(ws.maxFrameSize) < allowed {
 		if int32(ws.maxFrameSize) < allowed {
 			allowed = int32(ws.maxFrameSize)
 			allowed = int32(ws.maxFrameSize)
 		}
 		}
-		if allowed == 0 {
+		if len(wd.p) > 0 && allowed == 0 {
 			panic("internal error: ws.maxFrameSize not initialized or invalid")
 			panic("internal error: ws.maxFrameSize not initialized or invalid")
 		}
 		}
 		// TODO: further restrict the allowed size, because even if
 		// TODO: further restrict the allowed size, because even if
@@ -180,6 +183,7 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 				done: nil,
 				done: nil,
 			}, true
 			}, true
 		}
 		}
+		wm.stream.flow.take(int32(len(wd.p)))
 	}
 	}
 
 
 	q.shift()
 	q.shift()