Browse Source

Merge pull request #19 from tatsuhiro-t/enforce-connfc

Respect peer's connection-level flow control
Brad Fitzpatrick 11 years ago
parent
commit
17a4784adc
3 changed files with 18 additions and 4 deletions
  1. 7 1
      server.go
  2. 5 1
      server_test.go
  3. 6 2
      writesched.go

+ 7 - 1
server.go

@@ -591,11 +591,15 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 	sc.serveG.check()
 	sc.serveG.check()
 	// Fast path for common case:
 	// Fast path for common case:
-	if !sc.writingFrame && sc.writeSched.empty() {
+	if _, ok := wm.write.(*writeData); !ok && !sc.writingFrame && sc.writeSched.empty() {
 		sc.startFrameWrite(wm)
 		sc.startFrameWrite(wm)
 		return
 		return
 	}
 	}
 	sc.writeSched.add(wm)
 	sc.writeSched.add(wm)
+	// Sometimes no sc.scheduleFrameWrite() is called after we
+	// added wm to sc.writeSched and frames are completely
+	// blocked. To prevent this happing, we call it here.
+	sc.scheduleFrameWrite()
 }
 }
 
 
 // startFrameWrite starts a goroutine to write wm (in a separate
 // startFrameWrite starts a goroutine to write wm (in a separate
@@ -1054,6 +1058,8 @@ func (sc *serverConn) processHeaders(f *HeadersFrame) error {
 		id:    id,
 		id:    id,
 		state: stateOpen,
 		state: stateOpen,
 	}
 	}
+	// connection-level flow control is shared by all streams.
+	st.flow.conn = &sc.flow
 	st.flow.add(sc.initialWindowSize)
 	st.flow.add(sc.initialWindowSize)
 	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	if f.StreamEnded() {
 	if f.StreamEnded() {

+ 5 - 1
server_test.go

@@ -1408,7 +1408,11 @@ func TestServer_Response_LargeWrite(t *testing.T) {
 		if err := st.fr.WriteWindowUpdate(1, size); err != nil {
 		if err := st.fr.WriteWindowUpdate(1, size); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-
+		// Give the handler quote to write to connection-level
+		// window as well
+		if err := st.fr.WriteWindowUpdate(0, size); err != nil {
+			t.Fatal(err)
+		}
 		hf := st.wantHeaders()
 		hf := st.wantHeaders()
 		if hf.StreamEnded() {
 		if hf.StreamEnded() {
 			t.Fatal("unexpected END_STREAM flag")
 			t.Fatal("unexpected END_STREAM flag")

+ 6 - 2
writesched.go

@@ -145,14 +145,17 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 	// and we don't have enough, write as much as we can.
 	// and we don't have enough, write as much as we can.
 	if wd, ok := wm.write.(*writeData); ok {
 	if wd, ok := wm.write.(*writeData); ok {
 		allowed := wm.stream.flow.available() // max we can write
 		allowed := wm.stream.flow.available() // max we can write
-		if allowed == 0 {
+		// We can write 0 byte DATA frame (which usually bears
+		// END_STREAM, i.e., last DATA frame) even if allowed
+		// == 0.
+		if len(wd.p) > 0 && allowed == 0 {
 			// No quota available. Caller can try the next stream.
 			// No quota available. Caller can try the next stream.
 			return frameWriteMsg{}, false
 			return frameWriteMsg{}, false
 		}
 		}
 		if int32(ws.maxFrameSize) < allowed {
 		if int32(ws.maxFrameSize) < allowed {
 			allowed = int32(ws.maxFrameSize)
 			allowed = int32(ws.maxFrameSize)
 		}
 		}
-		if allowed == 0 {
+		if len(wd.p) > 0 && allowed == 0 {
 			panic("internal error: ws.maxFrameSize not initialized or invalid")
 			panic("internal error: ws.maxFrameSize not initialized or invalid")
 		}
 		}
 		// TODO: further restrict the allowed size, because even if
 		// TODO: further restrict the allowed size, because even if
@@ -180,6 +183,7 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 				done: nil,
 				done: nil,
 			}, true
 			}, true
 		}
 		}
+		wm.stream.flow.take(int32(len(wd.p)))
 	}
 	}
 
 
 	q.shift()
 	q.shift()