Browse Source

Simplify write scheduler a bit, remove serverConn.writeFrame's fast path for now

Brad Fitzpatrick 11 years ago
parent
commit
a92fa95e54
2 changed files with 20 additions and 27 deletions
  1. 8 14
      server.go
  2. 12 13
      writesched.go

+ 8 - 14
server.go

@@ -582,23 +582,17 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
 	}
 	}
 }
 }
 
 
-// writeFrame either sends wm to the writeFrames goroutine, or
-// enqueues it for the future (with no pushback; the serve goroutine
-// never blocks!), for sending when the currently-being-written frame
-// is done writing.
+// writeFrame schedules a frame to write and sends it if there's nothing
+// already being written.
 //
 //
-// If you're not on the serve goroutine, use writeFrame instead.
+// There is no pushback here (the serve goroutine never blocks). It's
+// the http.Handlers that block, waiting for their previous frames to
+// make it onto the wire
+//
+// If you're not on the serve goroutine, use writeFrameFromHandler instead.
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 	sc.serveG.check()
 	sc.serveG.check()
-	// Fast path for common case:
-	if _, ok := wm.write.(*writeData); !ok && !sc.writingFrame && sc.writeSched.empty() {
-		sc.startFrameWrite(wm)
-		return
-	}
 	sc.writeSched.add(wm)
 	sc.writeSched.add(wm)
-	// Sometimes no sc.scheduleFrameWrite() is called after we
-	// added wm to sc.writeSched and frames are completely
-	// blocked. To prevent this happing, we call it here.
 	sc.scheduleFrameWrite()
 	sc.scheduleFrameWrite()
 }
 }
 
 
@@ -622,7 +616,7 @@ func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
 				sc.wroteFrameCh <- struct{}{}
 				sc.wroteFrameCh <- struct{}{}
 				return
 				return
 			}
 			}
-			panic("internal error: attempt to send a frame on a closed stream")
+			panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
 		}
 		}
 	}
 	}
 
 

+ 12 - 13
writesched.go

@@ -88,6 +88,10 @@ func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
 // It is illegal to call this if the scheduler is empty or if there are no connection-level
 // It is illegal to call this if the scheduler is empty or if there are no connection-level
 // flow control bytes available.
 // flow control bytes available.
 func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
 func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
+	if ws.maxFrameSize == 0 {
+		panic("internal error: ws.maxFrameSize not initialized or invalid")
+	}
+
 	// If there any frames not associated with streams, prefer those first.
 	// If there any frames not associated with streams, prefer those first.
 	// These are usually SETTINGS, etc.
 	// These are usually SETTINGS, etc.
 	if !ws.zero.empty() {
 	if !ws.zero.empty() {
@@ -160,25 +164,20 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 	wm = q.head()
 	wm = q.head()
 	// If the first item in this queue costs flow control tokens
 	// If the first item in this queue costs flow control tokens
 	// and we don't have enough, write as much as we can.
 	// and we don't have enough, write as much as we can.
-	if wd, ok := wm.write.(*writeData); ok {
+	if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
 		allowed := wm.stream.flow.available() // max we can write
 		allowed := wm.stream.flow.available() // max we can write
-		// We can write 0 byte DATA frame (which usually bears
-		// END_STREAM, i.e., last DATA frame) even if allowed
-		// == 0.
-		if len(wd.p) > 0 && allowed == 0 {
+		if allowed == 0 {
 			// No quota available. Caller can try the next stream.
 			// No quota available. Caller can try the next stream.
 			return frameWriteMsg{}, false
 			return frameWriteMsg{}, false
 		}
 		}
 		if int32(ws.maxFrameSize) < allowed {
 		if int32(ws.maxFrameSize) < allowed {
 			allowed = int32(ws.maxFrameSize)
 			allowed = int32(ws.maxFrameSize)
 		}
 		}
-		if len(wd.p) > 0 && allowed == 0 {
-			panic("internal error: ws.maxFrameSize not initialized or invalid")
-		}
 		// TODO: further restrict the allowed size, because even if
 		// TODO: further restrict the allowed size, because even if
 		// the peer says it's okay to write 16MB data frames, we might
 		// the peer says it's okay to write 16MB data frames, we might
 		// want to write smaller ones to properly weight competing
 		// want to write smaller ones to properly weight competing
 		// streams' priorities.
 		// streams' priorities.
+
 		if len(wd.p) > int(allowed) {
 		if len(wd.p) > int(allowed) {
 			wm.stream.flow.take(allowed)
 			wm.stream.flow.take(allowed)
 			chunk := wd.p[:allowed]
 			chunk := wd.p[:allowed]
@@ -190,13 +189,13 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 				write: &writeData{
 				write: &writeData{
 					streamID: wd.streamID,
 					streamID: wd.streamID,
 					p:        chunk,
 					p:        chunk,
-					// even if the original was true, there are bytes
-					// remaining because len(wd.p) > allowed, so we
-					// know endStream is false:
+					// even if the original had endStream set, there
+					// arebytes remaining because len(wd.p) > allowed,
+					// so we know endStream is false:
 					endStream: false,
 					endStream: false,
 				},
 				},
-				// completeness.  our caller is blocking on the final
-				// DATA frame, not these intermediates:
+				// our caller is blocking on the final DATA frame, not
+				// these intermediates, so no need to wait:
 				done: nil,
 				done: nil,
 			}, true
 			}, true
 		}
 		}