|
|
@@ -34,34 +34,47 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal
|
|
|
w.Header().Set("Transfer-Encoding", "chunked")
|
|
|
w.Header().Set("Content-Type", marshaler.ContentType())
|
|
|
if err := handleForwardResponseOptions(ctx, w, nil, opts); err != nil {
|
|
|
- http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
+ HTTPError(ctx, mux, marshaler, w, req, err)
|
|
|
return
|
|
|
}
|
|
|
- w.WriteHeader(http.StatusOK)
|
|
|
- f.Flush()
|
|
|
+
|
|
|
+ var delimiter []byte
|
|
|
+ if d, ok := marshaler.(Delimited); ok {
|
|
|
+ delimiter = d.Delimiter()
|
|
|
+ } else {
|
|
|
+ delimiter = []byte("\n")
|
|
|
+ }
|
|
|
+
|
|
|
+ var wroteHeader bool
|
|
|
for {
|
|
|
resp, err := recv()
|
|
|
if err == io.EOF {
|
|
|
return
|
|
|
}
|
|
|
if err != nil {
|
|
|
- handleForwardResponseStreamError(marshaler, w, err)
|
|
|
+ handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
|
|
|
return
|
|
|
}
|
|
|
if err := handleForwardResponseOptions(ctx, w, resp, opts); err != nil {
|
|
|
- handleForwardResponseStreamError(marshaler, w, err)
|
|
|
+ handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
buf, err := marshaler.Marshal(streamChunk(resp, nil))
|
|
|
if err != nil {
|
|
|
grpclog.Printf("Failed to marshal response chunk: %v", err)
|
|
|
+ handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
|
|
|
return
|
|
|
}
|
|
|
if _, err = w.Write(buf); err != nil {
|
|
|
grpclog.Printf("Failed to send response chunk: %v", err)
|
|
|
return
|
|
|
}
|
|
|
+ wroteHeader = true
|
|
|
+ if _, err = w.Write(delimiter); err != nil {
|
|
|
+ grpclog.Printf("Failed to send delimiter chunk: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
f.Flush()
|
|
|
}
|
|
|
}
|
|
|
@@ -134,13 +147,20 @@ func handleForwardResponseOptions(ctx context.Context, w http.ResponseWriter, re
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func handleForwardResponseStreamError(marshaler Marshaler, w http.ResponseWriter, err error) {
|
|
|
+func handleForwardResponseStreamError(wroteHeader bool, marshaler Marshaler, w http.ResponseWriter, err error) {
|
|
|
buf, merr := marshaler.Marshal(streamChunk(nil, err))
|
|
|
if merr != nil {
|
|
|
grpclog.Printf("Failed to marshal an error: %v", merr)
|
|
|
return
|
|
|
}
|
|
|
- if _, werr := fmt.Fprintf(w, "%s\n", buf); werr != nil {
|
|
|
+ if !wroteHeader {
|
|
|
+ s, ok := status.FromError(err)
|
|
|
+ if !ok {
|
|
|
+ s = status.New(codes.Unknown, err.Error())
|
|
|
+ }
|
|
|
+ w.WriteHeader(HTTPStatusFromCode(s.Code()))
|
|
|
+ }
|
|
|
+ if _, werr := w.Write(buf); werr != nil {
|
|
|
grpclog.Printf("Failed to notify error to client: %v", werr)
|
|
|
return
|
|
|
}
|