|
|
@@ -6,14 +6,53 @@ package http2
|
|
|
|
|
|
import "fmt"
|
|
|
|
|
|
-// frameWriteMsg is a request to write a frame.
|
|
|
-type frameWriteMsg struct {
|
|
|
+// WriteScheduler is the interface implemented by HTTP/2 write schedulers.
|
|
|
+// Methods are never called concurrently.
|
|
|
+type WriteScheduler interface {
|
|
|
+ // OpenStream opens a new stream in the write scheduler.
|
|
|
+ // It is illegal to call this with streamID=0 or with a streamID that is
|
|
|
+ // already open -- the call may panic.
|
|
|
+ OpenStream(streamID uint32, options OpenStreamOptions)
|
|
|
+
|
|
|
+ // CloseStream closes a stream in the write scheduler. Any frames queued on
|
|
|
+ // this stream should be discarded. It is illegal to call this on a stream
|
|
|
+ // that is not open -- the call may panic.
|
|
|
+ CloseStream(streamID uint32)
|
|
|
+
|
|
|
+ // AdjustStream adjusts the priority of the given stream. This may be called
|
|
|
+ // on a stream that has not yet been opened or has been closed. Note that
|
|
|
+ // RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
|
|
|
+ // https://tools.ietf.org/html/rfc7540#section-5.1
|
|
|
+ AdjustStream(streamID uint32, priority PriorityParam)
|
|
|
+
|
|
|
+ // Push queues a frame in the scheduler. In most cases, this will not be
|
|
|
+ // called with wr.StreamID()!=0 unless that stream is currently open. The one
|
|
|
+ // exception is RST_STREAM frames, which may be sent on idle or closed streams.
|
|
|
+ Push(wr FrameWriteRequest)
|
|
|
+
|
|
|
+ // Pop dequeues the next frame to write. Returns false if no frames can
|
|
|
+ // be written. Frames with a given wr.StreamID() are Pop'd in the same
|
|
|
+ // order they are Push'd.
|
|
|
+ Pop() (wr FrameWriteRequest, ok bool)
|
|
|
+}
|
|
|
+
|
|
|
+// OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
|
|
|
+type OpenStreamOptions struct {
|
|
|
+ // PusherID is zero if the stream was initiated by the client. Otherwise,
|
|
|
+ // PusherID names the stream that pushed the newly opened stream.
|
|
|
+ PusherID uint32
|
|
|
+}
|
|
|
+
|
|
|
+// FrameWriteRequest is a request to write a frame.
|
|
|
+type FrameWriteRequest struct {
|
|
|
// write is the interface value that does the writing, once the
|
|
|
- // writeScheduler (below) has decided to select this frame
|
|
|
- // to write. The write functions are all defined in write.go.
|
|
|
+ // WriteScheduler has selected this frame to write. The write
|
|
|
+ // functions are all defined in write.go.
|
|
|
write writeFramer
|
|
|
|
|
|
- stream *stream // used for prioritization. nil for non-stream frames.
|
|
|
+ // stream is the stream on which this frame will be written.
|
|
|
+ // nil for non-stream frames like PING and SETTINGS.
|
|
|
+ stream *stream
|
|
|
|
|
|
// done, if non-nil, must be a buffered channel with space for
|
|
|
// 1 message and is sent the return value from write (or an
|
|
|
@@ -21,263 +60,183 @@ type frameWriteMsg struct {
|
|
|
done chan error
|
|
|
}
|
|
|
|
|
|
-// for debugging only:
|
|
|
-func (wm frameWriteMsg) String() string {
|
|
|
- var streamID uint32
|
|
|
- if wm.stream != nil {
|
|
|
- streamID = wm.stream.id
|
|
|
- }
|
|
|
- var des string
|
|
|
- if s, ok := wm.write.(fmt.Stringer); ok {
|
|
|
- des = s.String()
|
|
|
- } else {
|
|
|
- des = fmt.Sprintf("%T", wm.write)
|
|
|
- }
|
|
|
- return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
|
|
|
-}
|
|
|
-
|
|
|
-// writeScheduler tracks pending frames to write, priorities, and decides
|
|
|
-// the next one to use. It is not thread-safe.
|
|
|
-type writeScheduler struct {
|
|
|
- // zero are frames not associated with a specific stream.
|
|
|
- // They're sent before any stream-specific freams.
|
|
|
- zero writeQueue
|
|
|
-
|
|
|
- // maxFrameSize is the maximum size of a DATA frame
|
|
|
- // we'll write. Must be non-zero and between 16K-16M.
|
|
|
- maxFrameSize uint32
|
|
|
-
|
|
|
- // sq contains the stream-specific queues, keyed by stream ID.
|
|
|
- // when a stream is idle, it's deleted from the map.
|
|
|
- sq map[uint32]*writeQueue
|
|
|
-
|
|
|
- // canSend is a slice of memory that's reused between frame
|
|
|
- // scheduling decisions to hold the list of writeQueues (from sq)
|
|
|
- // which have enough flow control data to send. After canSend is
|
|
|
- // built, the best is selected.
|
|
|
- canSend []*writeQueue
|
|
|
-
|
|
|
- // pool of empty queues for reuse.
|
|
|
- queuePool []*writeQueue
|
|
|
-}
|
|
|
-
|
|
|
-func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
|
|
|
- if len(q.s) != 0 {
|
|
|
- panic("queue must be empty")
|
|
|
- }
|
|
|
- ws.queuePool = append(ws.queuePool, q)
|
|
|
-}
|
|
|
-
|
|
|
-func (ws *writeScheduler) getEmptyQueue() *writeQueue {
|
|
|
- ln := len(ws.queuePool)
|
|
|
- if ln == 0 {
|
|
|
- return new(writeQueue)
|
|
|
- }
|
|
|
- q := ws.queuePool[ln-1]
|
|
|
- ws.queuePool = ws.queuePool[:ln-1]
|
|
|
- return q
|
|
|
-}
|
|
|
-
|
|
|
-func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
|
|
|
-
|
|
|
-func (ws *writeScheduler) add(wm frameWriteMsg) {
|
|
|
- st := wm.stream
|
|
|
- if st == nil {
|
|
|
- ws.zero.push(wm)
|
|
|
- } else {
|
|
|
- ws.streamQueue(st.id).push(wm)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
|
|
|
- if q, ok := ws.sq[streamID]; ok {
|
|
|
- return q
|
|
|
- }
|
|
|
- if ws.sq == nil {
|
|
|
- ws.sq = make(map[uint32]*writeQueue)
|
|
|
- }
|
|
|
- q := ws.getEmptyQueue()
|
|
|
- ws.sq[streamID] = q
|
|
|
- return q
|
|
|
-}
|
|
|
-
|
|
|
-// take returns the most important frame to write and removes it from the scheduler.
|
|
|
-// It is illegal to call this if the scheduler is empty or if there are no connection-level
|
|
|
-// flow control bytes available.
|
|
|
-func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
|
|
|
- if ws.maxFrameSize == 0 {
|
|
|
- panic("internal error: ws.maxFrameSize not initialized or invalid")
|
|
|
- }
|
|
|
-
|
|
|
- // If there any frames not associated with streams, prefer those first.
|
|
|
- // These are usually SETTINGS, etc.
|
|
|
- if !ws.zero.empty() {
|
|
|
- return ws.zero.shift(), true
|
|
|
- }
|
|
|
- if len(ws.sq) == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // Next, prioritize frames on streams that aren't DATA frames (no cost).
|
|
|
- for id, q := range ws.sq {
|
|
|
- if q.firstIsNoCost() {
|
|
|
- return ws.takeFrom(id, q)
|
|
|
+// StreamID returns the id of the stream this frame will be written to.
|
|
|
+// 0 is used for non-stream frames such as PING and SETTINGS.
|
|
|
+func (wr FrameWriteRequest) StreamID() uint32 {
|
|
|
+ if wr.stream == nil {
|
|
|
+ if se, ok := wr.write.(StreamError); ok {
|
|
|
+ // (*serverConn).resetStream doesn't set
|
|
|
+ // stream because it doesn't necessarily have
|
|
|
+ // one. So special case this type of write
|
|
|
+ // message.
|
|
|
+ return se.StreamID
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // Now, all that remains are DATA frames with non-zero bytes to
|
|
|
- // send. So pick the best one.
|
|
|
- if len(ws.canSend) != 0 {
|
|
|
- panic("should be empty")
|
|
|
- }
|
|
|
- for _, q := range ws.sq {
|
|
|
- if n := ws.streamWritableBytes(q); n > 0 {
|
|
|
- ws.canSend = append(ws.canSend, q)
|
|
|
- }
|
|
|
- }
|
|
|
- if len(ws.canSend) == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
- defer ws.zeroCanSend()
|
|
|
-
|
|
|
- // TODO: find the best queue
|
|
|
- q := ws.canSend[0]
|
|
|
-
|
|
|
- return ws.takeFrom(q.streamID(), q)
|
|
|
-}
|
|
|
-
|
|
|
-// zeroCanSend is defered from take.
|
|
|
-func (ws *writeScheduler) zeroCanSend() {
|
|
|
- for i := range ws.canSend {
|
|
|
- ws.canSend[i] = nil
|
|
|
- }
|
|
|
- ws.canSend = ws.canSend[:0]
|
|
|
-}
|
|
|
-
|
|
|
-// streamWritableBytes returns the number of DATA bytes we could write
|
|
|
-// from the given queue's stream, if this stream/queue were
|
|
|
-// selected. It is an error to call this if q's head isn't a
|
|
|
-// *writeData.
|
|
|
-func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
|
|
|
- wm := q.head()
|
|
|
- ret := wm.stream.flow.available() // max we can write
|
|
|
- if ret == 0 {
|
|
|
return 0
|
|
|
}
|
|
|
- if int32(ws.maxFrameSize) < ret {
|
|
|
- ret = int32(ws.maxFrameSize)
|
|
|
- }
|
|
|
- if ret == 0 {
|
|
|
- panic("internal error: ws.maxFrameSize not initialized or invalid")
|
|
|
- }
|
|
|
- wd := wm.write.(*writeData)
|
|
|
- if len(wd.p) < int(ret) {
|
|
|
- ret = int32(len(wd.p))
|
|
|
- }
|
|
|
- return ret
|
|
|
-}
|
|
|
-
|
|
|
-func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
|
|
|
- wm = q.head()
|
|
|
- // If the first item in this queue costs flow control tokens
|
|
|
- // and we don't have enough, write as much as we can.
|
|
|
- if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
|
|
|
- allowed := wm.stream.flow.available() // max we can write
|
|
|
- if allowed == 0 {
|
|
|
- // No quota available. Caller can try the next stream.
|
|
|
- return frameWriteMsg{}, false
|
|
|
+ return wr.stream.id
|
|
|
+}
|
|
|
+
|
|
|
+// DataSize returns the number of flow control bytes that must be consumed
|
|
|
+// to write this entire frame. This is 0 for non-DATA frames.
|
|
|
+func (wr FrameWriteRequest) DataSize() int {
|
|
|
+ if wd, ok := wr.write.(*writeData); ok {
|
|
|
+ return len(wd.p)
|
|
|
+ }
|
|
|
+ return 0
|
|
|
+}
|
|
|
+
|
|
|
+// Consume consumes min(n, available) bytes from this frame, where available
|
|
|
+// is the number of flow control bytes available on the stream. Consume returns
|
|
|
+// 0, 1, or 2 frames, where the integer return value gives the number of frames
|
|
|
+// returned.
|
|
|
+//
|
|
|
+// If flow control prevents consuming any bytes, this returns (_, _, 0). If
|
|
|
+// the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
|
|
|
+// returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
|
|
|
+// 'rest' contains the remaining bytes. The consumed bytes are deducted from the
|
|
|
+// underlying stream's flow control budget.
|
|
|
+func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
|
|
|
+ var empty FrameWriteRequest
|
|
|
+
|
|
|
+ // Non-DATA frames are always consumed whole.
|
|
|
+ wd, ok := wr.write.(*writeData)
|
|
|
+ if !ok || len(wd.p) == 0 {
|
|
|
+ return wr, empty, 1
|
|
|
+ }
|
|
|
+
|
|
|
+ // Might need to split after applying limits.
|
|
|
+ allowed := wr.stream.flow.available()
|
|
|
+ if n < allowed {
|
|
|
+ allowed = n
|
|
|
+ }
|
|
|
+ if wr.stream.sc.maxFrameSize < allowed {
|
|
|
+ allowed = wr.stream.sc.maxFrameSize
|
|
|
+ }
|
|
|
+ if allowed <= 0 {
|
|
|
+ return empty, empty, 0
|
|
|
+ }
|
|
|
+ if len(wd.p) > int(allowed) {
|
|
|
+ wr.stream.flow.take(allowed)
|
|
|
+ consumed := FrameWriteRequest{
|
|
|
+ stream: wr.stream,
|
|
|
+ write: &writeData{
|
|
|
+ streamID: wd.streamID,
|
|
|
+ p: wd.p[:allowed],
|
|
|
+ // Even if the original had endStream set, there
|
|
|
+ // are bytes remaining because len(wd.p) > allowed,
|
|
|
+ // so we know endStream is false.
|
|
|
+ endStream: false,
|
|
|
+ },
|
|
|
+ // Our caller is blocking on the final DATA frame, not
|
|
|
+ // this intermediate frame, so no need to wait.
|
|
|
+ done: nil,
|
|
|
}
|
|
|
- if int32(ws.maxFrameSize) < allowed {
|
|
|
- allowed = int32(ws.maxFrameSize)
|
|
|
- }
|
|
|
- // TODO: further restrict the allowed size, because even if
|
|
|
- // the peer says it's okay to write 16MB data frames, we might
|
|
|
- // want to write smaller ones to properly weight competing
|
|
|
- // streams' priorities.
|
|
|
-
|
|
|
- if len(wd.p) > int(allowed) {
|
|
|
- wm.stream.flow.take(allowed)
|
|
|
- chunk := wd.p[:allowed]
|
|
|
- wd.p = wd.p[allowed:]
|
|
|
- // Make up a new write message of a valid size, rather
|
|
|
- // than shifting one off the queue.
|
|
|
- return frameWriteMsg{
|
|
|
- stream: wm.stream,
|
|
|
- write: &writeData{
|
|
|
- streamID: wd.streamID,
|
|
|
- p: chunk,
|
|
|
- // even if the original had endStream set, there
|
|
|
- // arebytes remaining because len(wd.p) > allowed,
|
|
|
- // so we know endStream is false:
|
|
|
- endStream: false,
|
|
|
- },
|
|
|
- // our caller is blocking on the final DATA frame, not
|
|
|
- // these intermediates, so no need to wait:
|
|
|
- done: nil,
|
|
|
- }, true
|
|
|
+ rest := FrameWriteRequest{
|
|
|
+ stream: wr.stream,
|
|
|
+ write: &writeData{
|
|
|
+ streamID: wd.streamID,
|
|
|
+ p: wd.p[allowed:],
|
|
|
+ endStream: wd.endStream,
|
|
|
+ },
|
|
|
+ done: wr.done,
|
|
|
}
|
|
|
- wm.stream.flow.take(int32(len(wd.p)))
|
|
|
+ return consumed, rest, 2
|
|
|
}
|
|
|
|
|
|
- q.shift()
|
|
|
- if q.empty() {
|
|
|
- ws.putEmptyQueue(q)
|
|
|
- delete(ws.sq, id)
|
|
|
+ // The frame is consumed whole.
|
|
|
+ // NB: This cast cannot overflow because allowed is <= math.MaxInt32.
|
|
|
+ wr.stream.flow.take(int32(len(wd.p)))
|
|
|
+ return wr, empty, 1
|
|
|
+}
|
|
|
+
|
|
|
+// String is for debugging only.
|
|
|
+func (wr FrameWriteRequest) String() string {
|
|
|
+ var des string
|
|
|
+ if s, ok := wr.write.(fmt.Stringer); ok {
|
|
|
+ des = s.String()
|
|
|
+ } else {
|
|
|
+ des = fmt.Sprintf("%T", wr.write)
|
|
|
}
|
|
|
- return wm, true
|
|
|
+ return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
|
|
|
}
|
|
|
|
|
|
-func (ws *writeScheduler) forgetStream(id uint32) {
|
|
|
- q, ok := ws.sq[id]
|
|
|
- if !ok {
|
|
|
+// replyToWriter sends err to wr.done and panics if the send must block
|
|
|
+// This does nothing if wr.done is nil.
|
|
|
+func (wr *FrameWriteRequest) replyToWriter(err error) {
|
|
|
+ if wr.done == nil {
|
|
|
return
|
|
|
}
|
|
|
- delete(ws.sq, id)
|
|
|
-
|
|
|
- // But keep it for others later.
|
|
|
- for i := range q.s {
|
|
|
- q.s[i] = frameWriteMsg{}
|
|
|
+ select {
|
|
|
+ case wr.done <- err:
|
|
|
+ default:
|
|
|
+ panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
|
|
|
}
|
|
|
- q.s = q.s[:0]
|
|
|
- ws.putEmptyQueue(q)
|
|
|
+ wr.write = nil // prevent use (assume it's tainted after wr.done send)
|
|
|
}
|
|
|
|
|
|
+// writeQueue is used by implementations of WriteScheduler.
|
|
|
type writeQueue struct {
|
|
|
- s []frameWriteMsg
|
|
|
+ s []FrameWriteRequest
|
|
|
}
|
|
|
|
|
|
-// streamID returns the stream ID for a non-empty stream-specific queue.
|
|
|
-func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
|
|
|
-
|
|
|
func (q *writeQueue) empty() bool { return len(q.s) == 0 }
|
|
|
|
|
|
-func (q *writeQueue) push(wm frameWriteMsg) {
|
|
|
- q.s = append(q.s, wm)
|
|
|
+func (q *writeQueue) push(wr FrameWriteRequest) {
|
|
|
+ q.s = append(q.s, wr)
|
|
|
}
|
|
|
|
|
|
-// head returns the next item that would be removed by shift.
|
|
|
-func (q *writeQueue) head() frameWriteMsg {
|
|
|
+func (q *writeQueue) shift() FrameWriteRequest {
|
|
|
if len(q.s) == 0 {
|
|
|
panic("invalid use of queue")
|
|
|
}
|
|
|
- return q.s[0]
|
|
|
+ wr := q.s[0]
|
|
|
+ // TODO: less copy-happy queue.
|
|
|
+ copy(q.s, q.s[1:])
|
|
|
+ q.s[len(q.s)-1] = FrameWriteRequest{}
|
|
|
+ q.s = q.s[:len(q.s)-1]
|
|
|
+ return wr
|
|
|
}
|
|
|
|
|
|
-func (q *writeQueue) shift() frameWriteMsg {
|
|
|
+// consume consumes up to n bytes from q.s[0]. If the frame is
|
|
|
+// entirely consumed, it is removed from the queue. If the frame
|
|
|
+// is partially consumed, the frame is kept with the consumed
|
|
|
+// bytes removed. Returns true iff any bytes were consumed.
|
|
|
+func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
|
|
|
if len(q.s) == 0 {
|
|
|
- panic("invalid use of queue")
|
|
|
+ return FrameWriteRequest{}, false
|
|
|
}
|
|
|
- wm := q.s[0]
|
|
|
- // TODO: less copy-happy queue.
|
|
|
- copy(q.s, q.s[1:])
|
|
|
- q.s[len(q.s)-1] = frameWriteMsg{}
|
|
|
- q.s = q.s[:len(q.s)-1]
|
|
|
- return wm
|
|
|
+ consumed, rest, numresult := q.s[0].Consume(n)
|
|
|
+ switch numresult {
|
|
|
+ case 0:
|
|
|
+ return FrameWriteRequest{}, false
|
|
|
+ case 1:
|
|
|
+ q.shift()
|
|
|
+ case 2:
|
|
|
+ q.s[0] = rest
|
|
|
+ }
|
|
|
+ return consumed, true
|
|
|
+}
|
|
|
+
|
|
|
+type writeQueuePool []*writeQueue
|
|
|
+
|
|
|
+// put inserts an unused writeQueue into the pool.
|
|
|
+func (p *writeQueuePool) put(q *writeQueue) {
|
|
|
+ for i := range q.s {
|
|
|
+ q.s[i] = FrameWriteRequest{}
|
|
|
+ }
|
|
|
+ q.s = q.s[:0]
|
|
|
+ *p = append(*p, q)
|
|
|
}
|
|
|
|
|
|
-func (q *writeQueue) firstIsNoCost() bool {
|
|
|
- if df, ok := q.s[0].write.(*writeData); ok {
|
|
|
- return len(df.p) == 0
|
|
|
+// get returns an empty writeQueue.
|
|
|
+func (p *writeQueuePool) get() *writeQueue {
|
|
|
+ ln := len(*p)
|
|
|
+ if ln == 0 {
|
|
|
+ return new(writeQueue)
|
|
|
}
|
|
|
- return true
|
|
|
+ x := ln - 1
|
|
|
+ q := (*p)[x]
|
|
|
+ (*p)[x] = nil
|
|
|
+ *p = (*p)[:x]
|
|
|
+ return q
|
|
|
}
|