|
@@ -31,6 +31,8 @@ const (
|
|
|
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+// TODO: automatic 100-continue
|
|
|
|
|
+
|
|
|
// TODO: finish GOAWAY support. Consider each incoming frame type and
|
|
// TODO: finish GOAWAY support. Consider each incoming frame type and
|
|
|
// whether it should be ignored during a shutdown race.
|
|
// whether it should be ignored during a shutdown race.
|
|
|
|
|
|
|
@@ -398,6 +400,13 @@ func (sc *serverConn) readPreface() error {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// should be called from non-serve() goroutines, otherwise the ends may deadlock
|
|
|
|
|
+// the serve loop. (it's only buffered a little bit).
|
|
|
|
|
+func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
|
|
+ sc.serveG.checkNotOn() // note the "NOT"
|
|
|
|
|
+ sc.wantWriteFrameCh <- wm
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
|
|
func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
|
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
|
// Fast path for common case:
|
|
// Fast path for common case:
|
|
@@ -413,7 +422,7 @@ func (sc *serverConn) enqueueSettingsAck() {
|
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
|
// Fast path for common case:
|
|
// Fast path for common case:
|
|
|
if !sc.writingFrame {
|
|
if !sc.writingFrame {
|
|
|
- sc.wantWriteFrameCh <- frameWriteMsg{write: (*serverConn).writeSettingsAck}
|
|
|
|
|
|
|
+ sc.writeFrameCh <- frameWriteMsg{write: (*serverConn).writeSettingsAck}
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
sc.needToSendSettingsAck = true
|
|
sc.needToSendSettingsAck = true
|
|
@@ -572,10 +581,10 @@ func (sc *serverConn) processPing(f *PingFrame) error {
|
|
|
// PROTOCOL_ERROR."
|
|
// PROTOCOL_ERROR."
|
|
|
return ConnectionError(ErrCodeProtocol)
|
|
return ConnectionError(ErrCodeProtocol)
|
|
|
}
|
|
}
|
|
|
- sc.wantWriteFrameCh <- frameWriteMsg{
|
|
|
|
|
|
|
+ sc.enqueueFrameWrite(frameWriteMsg{
|
|
|
write: (*serverConn).writePingAck,
|
|
write: (*serverConn).writePingAck,
|
|
|
v: f,
|
|
v: f,
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -904,6 +913,7 @@ type headerWriteReq struct {
|
|
|
// called from handler goroutines.
|
|
// called from handler goroutines.
|
|
|
// h may be nil.
|
|
// h may be nil.
|
|
|
func (sc *serverConn) writeHeaders(req headerWriteReq) {
|
|
func (sc *serverConn) writeHeaders(req headerWriteReq) {
|
|
|
|
|
+ sc.serveG.checkNotOn()
|
|
|
var errc chan error
|
|
var errc chan error
|
|
|
if req.h != nil {
|
|
if req.h != nil {
|
|
|
// If there's a header map (which we don't own), so we have to block on
|
|
// If there's a header map (which we don't own), so we have to block on
|
|
@@ -912,12 +922,12 @@ func (sc *serverConn) writeHeaders(req headerWriteReq) {
|
|
|
// mutates it.
|
|
// mutates it.
|
|
|
errc = make(chan error, 1)
|
|
errc = make(chan error, 1)
|
|
|
}
|
|
}
|
|
|
- sc.wantWriteFrameCh <- frameWriteMsg{
|
|
|
|
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
write: (*serverConn).writeHeadersFrame,
|
|
write: (*serverConn).writeHeadersFrame,
|
|
|
v: req,
|
|
v: req,
|
|
|
streamID: req.streamID,
|
|
streamID: req.streamID,
|
|
|
done: errc,
|
|
done: errc,
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
if errc != nil {
|
|
if errc != nil {
|
|
|
<-errc
|
|
<-errc
|
|
|
}
|
|
}
|
|
@@ -974,19 +984,19 @@ type windowUpdateReq struct {
|
|
|
func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) {
|
|
func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) {
|
|
|
const maxUint32 = 2147483647
|
|
const maxUint32 = 2147483647
|
|
|
for n >= maxUint32 {
|
|
for n >= maxUint32 {
|
|
|
- sc.wantWriteFrameCh <- frameWriteMsg{
|
|
|
|
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
|
v: windowUpdateReq{streamID, maxUint32},
|
|
v: windowUpdateReq{streamID, maxUint32},
|
|
|
streamID: streamID,
|
|
streamID: streamID,
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
n -= maxUint32
|
|
n -= maxUint32
|
|
|
}
|
|
}
|
|
|
if n > 0 {
|
|
if n > 0 {
|
|
|
- sc.wantWriteFrameCh <- frameWriteMsg{
|
|
|
|
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
|
v: windowUpdateReq{streamID, uint32(n)},
|
|
v: windowUpdateReq{streamID, uint32(n)},
|
|
|
streamID: streamID,
|
|
streamID: streamID,
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1123,13 +1133,13 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
|
|
|
rws.curChunkIsFinal = rws.handlerDone
|
|
rws.curChunkIsFinal = rws.handlerDone
|
|
|
|
|
|
|
|
// TODO: await flow control tokens for both stream and conn
|
|
// TODO: await flow control tokens for both stream and conn
|
|
|
- rws.sc.wantWriteFrameCh <- frameWriteMsg{
|
|
|
|
|
|
|
+ rws.sc.writeFrame(frameWriteMsg{
|
|
|
cost: uint32(len(p)),
|
|
cost: uint32(len(p)),
|
|
|
streamID: rws.streamID,
|
|
streamID: rws.streamID,
|
|
|
write: (*serverConn).writeDataFrame,
|
|
write: (*serverConn).writeDataFrame,
|
|
|
done: rws.chunkWrittenCh,
|
|
done: rws.chunkWrittenCh,
|
|
|
v: rws, // writeDataInLoop uses only rws.curChunk and rws.curChunkIsFinal
|
|
v: rws, // writeDataInLoop uses only rws.curChunk and rws.curChunkIsFinal
|
|
|
- }
|
|
|
|
|
|
|
+ })
|
|
|
err = <-rws.chunkWrittenCh // block until it's written
|
|
err = <-rws.chunkWrittenCh // block until it's written
|
|
|
return len(p), err
|
|
return len(p), err
|
|
|
}
|
|
}
|