|
|
@@ -159,6 +159,13 @@ type clientStream struct {
|
|
|
|
|
|
peerReset chan struct{} // closed on peer reset
|
|
|
resetErr error // populated before peerReset is closed
|
|
|
+
|
|
|
+ // owned by clientConnReadLoop:
|
|
|
+ headersDone bool // got HEADERS w/ END_HEADERS
|
|
|
+ trailersDone bool // got second HEADERS frame w/ END_HEADERS
|
|
|
+
|
|
|
+ trailer http.Header // accumulated trailers
|
|
|
+ resTrailer http.Header // client's Response.Trailer
|
|
|
}
|
|
|
|
|
|
// awaitRequestCancel runs in its own goroutine and waits for the user's
|
|
|
@@ -918,18 +925,42 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID u
|
|
|
// server got our RST_STREAM.
|
|
|
return nil
|
|
|
}
|
|
|
+ if cs.headersDone {
|
|
|
+ rl.hdec.SetEmitFunc(cs.onNewTrailerField)
|
|
|
+ } else {
|
|
|
+ rl.hdec.SetEmitFunc(rl.onNewHeaderField)
|
|
|
+ }
|
|
|
_, err := rl.hdec.Write(frag)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return ConnectionError(ErrCodeCompression)
|
|
|
}
|
|
|
if !headersEnded {
|
|
|
rl.continueStreamID = cs.ID
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
// HEADERS (or CONTINUATION) are now over.
|
|
|
rl.continueStreamID = 0
|
|
|
|
|
|
+ if !cs.headersDone {
|
|
|
+ cs.headersDone = true
|
|
|
+ } else {
|
|
|
+ // We're dealing with trailers. (and specifically the
|
|
|
+ // final frame of headers)
|
|
|
+ if cs.trailersDone {
|
|
|
+ // Too many HEADERS frames for this stream.
|
|
|
+ return ConnectionError(ErrCodeProtocol)
|
|
|
+ }
|
|
|
+ cs.trailersDone = true
|
|
|
+ if !streamEnded {
|
|
|
+ // We expect that any header block fragment
|
|
|
+ // frame for trailers with END_HEADERS also
|
|
|
+ // has END_STREAM.
|
|
|
+ return ConnectionError(ErrCodeProtocol)
|
|
|
+ }
|
|
|
+ rl.endStream(cs)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
if rl.reqMalformed != nil {
|
|
|
cs.resc <- resAndError{err: rl.reqMalformed}
|
|
|
rl.cc.writeStreamReset(cs.ID, ErrCodeProtocol, rl.reqMalformed)
|
|
|
@@ -970,6 +1001,7 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID u
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ cs.resTrailer = res.Trailer
|
|
|
rl.activeRes[cs.ID] = cs
|
|
|
cs.resc <- resAndError{res: res}
|
|
|
rl.nextRes = nil // unused now; will be reset next HEADERS frame
|
|
|
@@ -1076,12 +1108,24 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
|
}
|
|
|
|
|
|
if f.StreamEnded() {
|
|
|
- cs.bufPipe.CloseWithError(io.EOF)
|
|
|
- delete(rl.activeRes, cs.ID)
|
|
|
+ rl.endStream(cs)
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (rl *clientConnReadLoop) endStream(cs *clientStream) {
|
|
|
+ // TODO: check that any declared content-length matches, like
|
|
|
+ // server.go's (*stream).endStream method.
|
|
|
+ cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
|
|
|
+ delete(rl.activeRes, cs.ID)
|
|
|
+}
|
|
|
+
|
|
|
+func (cs *clientStream) copyTrailers() {
|
|
|
+ for k, vv := range cs.trailer {
|
|
|
+ cs.resTrailer[k] = vv
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
|
|
|
cc := rl.cc
|
|
|
cc.t.connPool().MarkDead(cc)
|
|
|
@@ -1203,6 +1247,7 @@ func (rl *clientConnReadLoop) onNewHeaderField(f hpack.HeaderField) {
|
|
|
if VerboseLogs {
|
|
|
cc.logf("Header field: %+v", f)
|
|
|
}
|
|
|
+ // TODO: enforce max header list size like server.
|
|
|
isPseudo := strings.HasPrefix(f.Name, ":")
|
|
|
if isPseudo {
|
|
|
if rl.sawRegHeader {
|
|
|
@@ -1226,7 +1271,38 @@ func (rl *clientConnReadLoop) onNewHeaderField(f hpack.HeaderField) {
|
|
|
}
|
|
|
} else {
|
|
|
rl.sawRegHeader = true
|
|
|
- rl.nextRes.Header.Add(http.CanonicalHeaderKey(f.Name), f.Value)
|
|
|
+ key := http.CanonicalHeaderKey(f.Name)
|
|
|
+ if key == "Trailer" {
|
|
|
+ t := rl.nextRes.Trailer
|
|
|
+ if t == nil {
|
|
|
+ t = make(http.Header)
|
|
|
+ rl.nextRes.Trailer = t
|
|
|
+ }
|
|
|
+ foreachHeaderElement(f.Value, func(v string) {
|
|
|
+ t[http.CanonicalHeaderKey(v)] = nil
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ rl.nextRes.Header.Add(key, f.Value)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (cs *clientStream) onNewTrailerField(f hpack.HeaderField) {
|
|
|
+ isPseudo := strings.HasPrefix(f.Name, ":")
|
|
|
+ if isPseudo {
|
|
|
+ // TODO: Bogus. report an error later when we close their body.
|
|
|
+ // drop for now.
|
|
|
+ return
|
|
|
+ }
|
|
|
+ key := http.CanonicalHeaderKey(f.Name)
|
|
|
+ if _, ok := cs.resTrailer[key]; ok {
|
|
|
+ if cs.trailer == nil {
|
|
|
+ cs.trailer = make(http.Header)
|
|
|
+ }
|
|
|
+ const tooBig = 1000 // TODO: arbitrary; use max header list size limits
|
|
|
+ if cur := cs.trailer[key]; len(cur) < tooBig {
|
|
|
+ cs.trailer[key] = append(cur, f.Value)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|