Browse Source

Merge pull request #6770 from heyitsanthony/fix-grpc

grpcproxy: add SetHeader support to ServerStream
Anthony Romano 9 years ago
parent
commit
b30dc10812
1 changed files with 27 additions and 2 deletions
  1. 27 2
      proxy/grpcproxy/watch_client_adapter.go

+ 27 - 2
proxy/grpcproxy/watch_client_adapter.go

@@ -15,12 +15,16 @@
 package grpcproxy
 package grpcproxy
 
 
 import (
 import (
+	"errors"
+
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/metadata"
 )
 )
 
 
+var errAlreadySentHeader = errors.New("grpcproxy: already send header")
+
 type ws2wc struct{ wserv pb.WatchServer }
 type ws2wc struct{ wserv pb.WatchServer }
 
 
 func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
 func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
@@ -31,7 +35,7 @@ func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_Wa
 	ch1, ch2 := make(chan interface{}), make(chan interface{})
 	ch1, ch2 := make(chan interface{}), make(chan interface{})
 	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
 	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
 	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
 	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
-	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
+	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}, nil}}
 	go func() {
 	go func() {
 		s.wserv.Watch(wserver)
 		s.wserv.Watch(wserver)
 		// close the server side sender
 		// close the server side sender
@@ -73,17 +77,38 @@ type chanServerStream struct {
 	headerc  chan<- metadata.MD
 	headerc  chan<- metadata.MD
 	trailerc chan<- metadata.MD
 	trailerc chan<- metadata.MD
 	grpc.Stream
 	grpc.Stream
+
+	headers []metadata.MD
 }
 }
 
 
 func (ss *chanServerStream) SendHeader(md metadata.MD) error {
 func (ss *chanServerStream) SendHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	outmd := make(map[string][]string)
+	for _, h := range append(ss.headers, md) {
+		for k, v := range h {
+			outmd[k] = v
+		}
+	}
 	select {
 	select {
-	case ss.headerc <- md:
+	case ss.headerc <- outmd:
+		ss.headerc = nil
+		ss.headers = nil
 		return nil
 		return nil
 	case <-ss.Context().Done():
 	case <-ss.Context().Done():
 	}
 	}
 	return ss.Context().Err()
 	return ss.Context().Err()
 }
 }
 
 
+func (ss *chanServerStream) SetHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	ss.headers = append(ss.headers, md)
+	return nil
+}
+
 func (ss *chanServerStream) SetTrailer(md metadata.MD) {
 func (ss *chanServerStream) SetTrailer(md metadata.MD) {
 	ss.trailerc <- md
 	ss.trailerc <- md
 }
 }