| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- // Copyright 2014 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
- // Licensed under the same terms as Go itself:
- // https://code.google.com/p/go/source/browse/LICENSE
- package http2
- // frameWriteMsg is a request to write a frame.
- type frameWriteMsg struct {
- // write is the function that does the writing, once the
- // writeScheduler (below) has decided to select this frame
- // to write. The write functions are all defined in write.go.
- write func(ctx writeContext, v interface{}) error
- // v is the argument passed to the write function. See each
- // function in write.go to see which type they should be,
- // depending on what write is.
- v interface{}
- cost uint32 // if DATA, number of flow control bytes required
- stream *stream // used for prioritization
- endStream bool // stream is being closed locally
- // done, if non-nil, must be a buffered channel with space for
- // 1 message and is sent the return value from write (or an
- // earlier error) when the frame has been written.
- done chan error
- }
- // writeScheduler tracks pending frames to write, priorities, and decides
- // the next one to use. It is not thread-safe.
- type writeScheduler struct {
- // zero are frames not associated with a specific stream.
- // They're sent before any stream-specific freams.
- zero writeQueue
- sq map[uint32]*writeQueue
- }
- func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
- func (ws *writeScheduler) add(wm frameWriteMsg) {
- st := wm.stream
- if st == nil {
- ws.zero.push(wm)
- } else {
- ws.streamQueue(st.id).push(wm)
- }
- }
- func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
- if q, ok := ws.sq[streamID]; ok {
- return q
- }
- if ws.sq == nil {
- ws.sq = make(map[uint32]*writeQueue)
- }
- q := new(writeQueue)
- ws.sq[streamID] = q
- return q
- }
- // take returns the most important frame to write and removes it from the scheduler.
- // It is illegal to call this if the scheduler is empty.
- func (ws *writeScheduler) take() frameWriteMsg {
- // If there any frames not associated with streams, prefer those first.
- // These are usually SETTINGS, etc.
- if !ws.zero.empty() {
- return ws.zero.shift()
- }
- if len(ws.sq) == 0 {
- panic("internal error: take should only be called if non-empty")
- }
- // Next, prioritize frames on streams that aren't DATA frames (no cost).
- for id, q := range ws.sq {
- if q.firstIsNoCost() {
- return ws.takeFrom(id, q)
- }
- }
- // Now, all that remains are DATA frames. So pick the best one.
- // TODO: do that. For now, pick a random one.
- for id, q := range ws.sq {
- return ws.takeFrom(id, q)
- }
- panic("internal error: take should only be called if non-empty")
- }
- func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) frameWriteMsg {
- wm := q.shift()
- if q.empty() {
- // TODO: reclaim its slice and use it for future allocations
- // in the writeScheduler.streamQueue method above when making
- // the writeQueue.
- delete(ws.sq, id)
- }
- return wm
- }
- type writeQueue struct {
- s []frameWriteMsg
- }
- func (q *writeQueue) empty() bool { return len(q.s) == 0 }
- func (q *writeQueue) push(wm frameWriteMsg) {
- q.s = append(q.s, wm)
- }
- func (q *writeQueue) shift() frameWriteMsg {
- if len(q.s) == 0 {
- panic("invalid use of queue")
- }
- wm := q.s[0]
- // TODO: less copy-happy queue.
- copy(q.s, q.s[1:])
- q.s[len(q.s)-1] = frameWriteMsg{}
- q.s = q.s[:len(q.s)-1]
- return wm
- }
- func (q *writeQueue) firstIsNoCost() bool { return q.s[0].cost == 0 }
|