|
|
@@ -23,6 +23,7 @@ import (
|
|
|
"fmt"
|
|
|
"runtime"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
|
|
|
"golang.org/x/net/http2"
|
|
|
"golang.org/x/net/http2/hpack"
|
|
|
@@ -84,12 +85,24 @@ func (il *itemList) isEmpty() bool {
|
|
|
// the control buffer of transport. They represent different aspects of
|
|
|
// control tasks, e.g., flow control, settings, streaming resetting, etc.
|
|
|
|
|
|
+// maxQueuedTransportResponseFrames is the most queued "transport response"
|
|
|
+// frames we will buffer before preventing new reads from occurring on the
|
|
|
+// transport. These are control frames sent in response to client requests,
|
|
|
+// such as RST_STREAM due to bad headers or settings acks.
|
|
|
+const maxQueuedTransportResponseFrames = 50
|
|
|
+
|
|
|
+type cbItem interface {
|
|
|
+ isTransportResponseFrame() bool
|
|
|
+}
|
|
|
+
|
|
|
// registerStream is used to register an incoming stream with loopy writer.
|
|
|
type registerStream struct {
|
|
|
streamID uint32
|
|
|
wq *writeQuota
|
|
|
}
|
|
|
|
|
|
+func (*registerStream) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
// headerFrame is also used to register stream on the client-side.
|
|
|
type headerFrame struct {
|
|
|
streamID uint32
|
|
|
@@ -102,6 +115,10 @@ type headerFrame struct {
|
|
|
onOrphaned func(error) // Valid on client-side
|
|
|
}
|
|
|
|
|
|
+func (h *headerFrame) isTransportResponseFrame() bool {
|
|
|
+ return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
|
|
|
+}
|
|
|
+
|
|
|
type cleanupStream struct {
|
|
|
streamID uint32
|
|
|
rst bool
|
|
|
@@ -109,6 +126,8 @@ type cleanupStream struct {
|
|
|
onWrite func()
|
|
|
}
|
|
|
|
|
|
+func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
|
|
|
+
|
|
|
type dataFrame struct {
|
|
|
streamID uint32
|
|
|
endStream bool
|
|
|
@@ -119,27 +138,41 @@ type dataFrame struct {
|
|
|
onEachWrite func()
|
|
|
}
|
|
|
|
|
|
+func (*dataFrame) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type incomingWindowUpdate struct {
|
|
|
streamID uint32
|
|
|
increment uint32
|
|
|
}
|
|
|
|
|
|
+func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type outgoingWindowUpdate struct {
|
|
|
streamID uint32
|
|
|
increment uint32
|
|
|
}
|
|
|
|
|
|
+func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
|
|
|
+ return false // window updates are throttled by thresholds
|
|
|
+}
|
|
|
+
|
|
|
type incomingSettings struct {
|
|
|
ss []http2.Setting
|
|
|
}
|
|
|
|
|
|
+func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
|
|
|
+
|
|
|
type outgoingSettings struct {
|
|
|
ss []http2.Setting
|
|
|
}
|
|
|
|
|
|
+func (*outgoingSettings) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type incomingGoAway struct {
|
|
|
}
|
|
|
|
|
|
+func (*incomingGoAway) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type goAway struct {
|
|
|
code http2.ErrCode
|
|
|
debugData []byte
|
|
|
@@ -147,15 +180,21 @@ type goAway struct {
|
|
|
closeConn bool
|
|
|
}
|
|
|
|
|
|
+func (*goAway) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type ping struct {
|
|
|
ack bool
|
|
|
data [8]byte
|
|
|
}
|
|
|
|
|
|
+func (*ping) isTransportResponseFrame() bool { return true }
|
|
|
+
|
|
|
type outFlowControlSizeRequest struct {
|
|
|
resp chan uint32
|
|
|
}
|
|
|
|
|
|
+func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
|
|
|
+
|
|
|
type outStreamState int
|
|
|
|
|
|
const (
|
|
|
@@ -238,6 +277,14 @@ type controlBuffer struct {
|
|
|
consumerWaiting bool
|
|
|
list *itemList
|
|
|
err error
|
|
|
+
|
|
|
+ // transportResponseFrames counts the number of queued items that represent
|
|
|
+ // the response of an action initiated by the peer. trfChan is created
|
|
|
+ // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
|
|
|
+ // closed and nilled when transportResponseFrames drops below the
|
|
|
+ // threshold. Both fields are protected by mu.
|
|
|
+ transportResponseFrames int
|
|
|
+ trfChan atomic.Value // *chan struct{}
|
|
|
}
|
|
|
|
|
|
func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
|
|
@@ -248,12 +295,24 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (c *controlBuffer) put(it interface{}) error {
|
|
|
+// throttle blocks if there are too many incomingSettings/cleanupStreams in the
|
|
|
+// controlbuf.
|
|
|
+func (c *controlBuffer) throttle() {
|
|
|
+ ch, _ := c.trfChan.Load().(*chan struct{})
|
|
|
+ if ch != nil {
|
|
|
+ select {
|
|
|
+ case <-*ch:
|
|
|
+ case <-c.done:
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controlBuffer) put(it cbItem) error {
|
|
|
_, err := c.executeAndPut(nil, it)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
|
|
|
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
|
|
|
var wakeUp bool
|
|
|
c.mu.Lock()
|
|
|
if c.err != nil {
|
|
|
@@ -271,6 +330,15 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
|
|
|
c.consumerWaiting = false
|
|
|
}
|
|
|
c.list.enqueue(it)
|
|
|
+ if it.isTransportResponseFrame() {
|
|
|
+ c.transportResponseFrames++
|
|
|
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
|
|
+ // We are adding the frame that puts us over the threshold; create
|
|
|
+ // a throttling channel.
|
|
|
+ ch := make(chan struct{})
|
|
|
+ c.trfChan.Store(&ch)
|
|
|
+ }
|
|
|
+ }
|
|
|
c.mu.Unlock()
|
|
|
if wakeUp {
|
|
|
select {
|
|
|
@@ -304,7 +372,17 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
|
|
|
return nil, c.err
|
|
|
}
|
|
|
if !c.list.isEmpty() {
|
|
|
- h := c.list.dequeue()
|
|
|
+ h := c.list.dequeue().(cbItem)
|
|
|
+ if h.isTransportResponseFrame() {
|
|
|
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
|
|
+ // We are removing the frame that put us over the
|
|
|
+ // threshold; close and clear the throttling channel.
|
|
|
+ ch := c.trfChan.Load().(*chan struct{})
|
|
|
+ close(*ch)
|
|
|
+ c.trfChan.Store((*chan struct{})(nil))
|
|
|
+ }
|
|
|
+ c.transportResponseFrames--
|
|
|
+ }
|
|
|
c.mu.Unlock()
|
|
|
return h, nil
|
|
|
}
|