Browse Source

Write scheduler changes towards selecting between competing writing streams.

Not done yet. Need to finish stream priority handling first.
Brad Fitzpatrick 11 years ago
parent
commit
605f62817c
1 changed files with 59 additions and 8 deletions
  1. 59 8
      writesched.go

+ 59 - 8
writesched.go

@@ -30,12 +30,18 @@ type writeScheduler struct {
 	zero writeQueue
 
 	// maxFrameSize is the maximum size of a DATA frame
-	// we'll write.
+	// we'll write. Must be non-zero and between 16K-16M.
 	maxFrameSize uint32
 
 	// sq contains the stream-specific queues, keyed by stream ID.
 	// when a stream is idle, it's deleted from the map.
 	sq map[uint32]*writeQueue
+
+	// canSend is a slice of memory that's reused between frame
+	// scheduling decisions to hold the list of writeQueues (from sq)
+	// which have enough flow control data to send. After canSend is
+	// built, the best is selected.
+	canSend []*writeQueue
 }
 
 func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
@@ -81,14 +87,56 @@ func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
 		}
 	}
 
-	// Now, all that remains are DATA frames. So pick the best one.
-	// TODO: do that. For now, pick a random one.
-	for id, q := range ws.sq {
-		if wm, ok := ws.takeFrom(id, q); ok {
-			return wm, true
+	// Now, all that remains are DATA frames with non-zero bytes to
+	// send. So pick the best one.
+	if len(ws.canSend) != 0 {
+		panic("should be empty")
+	}
+	for _, q := range ws.sq {
+		if n := ws.streamWritableBytes(q); n > 0 {
+			ws.canSend = append(ws.canSend, q)
 		}
 	}
-	return
+	if len(ws.canSend) == 0 {
+		return
+	}
+	defer ws.zeroCanSend()
+
+	// TODO: find the best queue
+	q := ws.canSend[0]
+
+	return ws.takeFrom(q.streamID(), q)
+}
+
+// zeroCanSend is defered from take.
+func (ws *writeScheduler) zeroCanSend() {
+	for i := range ws.canSend {
+		ws.canSend[i] = nil
+	}
+	ws.canSend = ws.canSend[:0]
+}
+
+// streamWritableBytes returns the number of DATA bytes we could write
+// from the given queue's stream, if this stream/queue were
+// selected. It is an error to call this if q's head isn't a
+// *writeData.
+func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
+	wm := q.head()
+	ret := wm.stream.flow.available() // max we can write
+	if ret == 0 {
+		return 0
+	}
+	if int32(ws.maxFrameSize) < ret {
+		ret = int32(ws.maxFrameSize)
+	}
+	if ret == 0 {
+		panic("internal error: ws.maxFrameSize not initialized or invalid")
+	}
+	wd := wm.write.(*writeData)
+	if len(wd.p) < int(ret) {
+		ret = int32(len(wd.p))
+	}
+	return ret
 }
 
 func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
@@ -99,7 +147,7 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 		allowed := wm.stream.flow.available() // max we can write
 		if allowed == 0 {
 			// No quota available. Caller can try the next stream.
-			return wm, false
+			return frameWriteMsg{}, false
 		}
 		if int32(ws.maxFrameSize) < allowed {
 			allowed = int32(ws.maxFrameSize)
@@ -148,6 +196,9 @@ type writeQueue struct {
 	s []frameWriteMsg
 }
 
+// streamID returns the stream ID for a non-empty stream-specific queue.
+func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
+
 func (q *writeQueue) empty() bool { return len(q.s) == 0 }
 
 func (q *writeQueue) push(wm frameWriteMsg) {