Browse Source

Forget pending frames when stream is closed (e.g. RST)

Also, re-use writeQueue structs and their slices to make less garbage.
Brad Fitzpatrick 11 years ago
parent
commit
5b95eb395f
2 changed files with 38 additions and 5 deletions
  1. 1 1
      server.go
  2. 37 4
      writesched.go

+ 1 - 1
server.go

@@ -882,7 +882,6 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
 	if ok {
 	if ok {
 		st.gotReset = true
 		st.gotReset = true
 		sc.closeStream(st, StreamError{f.StreamID, f.ErrCode})
 		sc.closeStream(st, StreamError{f.StreamID, f.ErrCode})
-		// XXX TODO drain writeSched for that stream
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -899,6 +898,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
 		p.Close(err)
 		p.Close(err)
 	}
 	}
 	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
 	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
+	sc.writeSched.forgetStream(st.id)
 }
 }
 
 
 func (sc *serverConn) processSettings(f *SettingsFrame) error {
 func (sc *serverConn) processSettings(f *SettingsFrame) error {

+ 37 - 4
writesched.go

@@ -59,6 +59,26 @@ type writeScheduler struct {
 	// which have enough flow control data to send. After canSend is
 	// which have enough flow control data to send. After canSend is
 	// built, the best is selected.
 	// built, the best is selected.
 	canSend []*writeQueue
 	canSend []*writeQueue
+
+	// pool of empty queues for reuse.
+	queuePool []*writeQueue
+}
+
+func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
+	if len(q.s) != 0 {
+		panic("queue must be empty")
+	}
+	ws.queuePool = append(ws.queuePool, q)
+}
+
+func (ws *writeScheduler) getEmptyQueue() *writeQueue {
+	ln := len(ws.queuePool)
+	if ln == 0 {
+		return new(writeQueue)
+	}
+	q := ws.queuePool[ln-1]
+	ws.queuePool = ws.queuePool[:ln-1]
+	return q
 }
 }
 
 
 func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
 func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
@@ -79,7 +99,7 @@ func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
 	if ws.sq == nil {
 	if ws.sq == nil {
 		ws.sq = make(map[uint32]*writeQueue)
 		ws.sq = make(map[uint32]*writeQueue)
 	}
 	}
-	q := new(writeQueue)
+	q := ws.getEmptyQueue()
 	ws.sq[streamID] = q
 	ws.sq[streamID] = q
 	return q
 	return q
 }
 }
@@ -204,14 +224,27 @@ func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg,
 
 
 	q.shift()
 	q.shift()
 	if q.empty() {
 	if q.empty() {
-		// TODO: reclaim its slice and use it for future allocations
-		// in the writeScheduler.streamQueue method above when making
-		// the writeQueue.
+		ws.putEmptyQueue(q)
 		delete(ws.sq, id)
 		delete(ws.sq, id)
 	}
 	}
 	return wm, true
 	return wm, true
 }
 }
 
 
+func (ws *writeScheduler) forgetStream(id uint32) {
+	q, ok := ws.sq[id]
+	if !ok {
+		return
+	}
+	delete(ws.sq, id)
+
+	// But keep it for others later.
+	for i := range q.s {
+		q.s[i] = frameWriteMsg{}
+	}
+	q.s = q.s[:0]
+	ws.putEmptyQueue(q)
+}
+
 type writeQueue struct {
 type writeQueue struct {
 	s []frameWriteMsg
 	s []frameWriteMsg
 }
 }