|
|
@@ -729,26 +729,28 @@ var errChanPool = sync.Pool{
|
|
|
New: func() interface{} { return make(chan error, 1) },
|
|
|
}
|
|
|
|
|
|
-// writeDataFromHandler writes the data described in req to stream.id.
|
|
|
-//
|
|
|
-// The flow control currently happens in the Handler where it waits
|
|
|
-// for 1 or more bytes to be available to then write here. So at this
|
|
|
-// point we know that we have flow control. But this might have to
|
|
|
-// change when priority is implemented, so the serve goroutine knows
|
|
|
-// the total amount of bytes waiting to be sent and can can have more
|
|
|
-// scheduling decisions available.
|
|
|
-func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData) error {
|
|
|
+var writeDataPool = sync.Pool{
|
|
|
+ New: func() interface{} { return new(writeData) },
|
|
|
+}
|
|
|
+
|
|
|
+// writeDataFromHandler writes DATA response frames from a handler on
|
|
|
+// the given stream.
|
|
|
+func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
|
|
|
ch := errChanPool.Get().(chan error)
|
|
|
+ writeArg := writeDataPool.Get().(*writeData)
|
|
|
+ *writeArg = writeData{stream.id, data, endStream}
|
|
|
err := sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: writeData,
|
|
|
+ write: writeArg,
|
|
|
stream: stream,
|
|
|
done: ch,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ var frameWriteDone bool // the frame write is done (successfully or not)
|
|
|
select {
|
|
|
case err = <-ch:
|
|
|
+ frameWriteDone = true
|
|
|
case <-sc.doneServing:
|
|
|
return errClientDisconnected
|
|
|
case <-stream.cw:
|
|
|
@@ -761,11 +763,15 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData)
|
|
|
// close.
|
|
|
select {
|
|
|
case err = <-ch:
|
|
|
+ frameWriteDone = true
|
|
|
default:
|
|
|
return errStreamClosed
|
|
|
}
|
|
|
}
|
|
|
errChanPool.Put(ch)
|
|
|
+ if frameWriteDone {
|
|
|
+ writeDataPool.Put(writeArg)
|
|
|
+ }
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -1713,7 +1719,6 @@ type responseWriterState struct {
|
|
|
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
|
|
|
sentHeader bool // have we sent the header frame?
|
|
|
handlerDone bool // handler has finished
|
|
|
- curWrite writeData
|
|
|
|
|
|
closeNotifierMu sync.Mutex // guards closeNotifierCh
|
|
|
closeNotifierCh chan bool // nil until first used
|
|
|
@@ -1762,14 +1767,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
|
|
|
return 0, nil
|
|
|
}
|
|
|
|
|
|
- // Reuse curWrite (which as a pointer fits into the
|
|
|
- // 'writeFramer' interface value) for each write to avoid an
|
|
|
- // allocation per write.
|
|
|
- curWrite := &rws.curWrite
|
|
|
- curWrite.streamID = rws.stream.id
|
|
|
- curWrite.p = p
|
|
|
- curWrite.endStream = rws.handlerDone
|
|
|
- if err := rws.conn.writeDataFromHandler(rws.stream, curWrite); err != nil {
|
|
|
+ if err := rws.conn.writeDataFromHandler(rws.stream, p, rws.handlerDone); err != nil {
|
|
|
return 0, err
|
|
|
}
|
|
|
return len(p), nil
|