Browse Source

grpcproxy: fix stream closing issue

Xiang Li 9 years ago
parent
commit
9ef0f5ef8a
1 changed files with 15 additions and 5 deletions
  1. 15 5
      proxy/grpcproxy/watch_client_adapter.go

+ 15 - 5
proxy/grpcproxy/watch_client_adapter.go

@@ -32,7 +32,11 @@ func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_Wa
 	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
 	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
 	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
 	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
 	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
 	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
-	go s.wserv.Watch(wserver)
+	go func() {
+		s.wserv.Watch(wserver)
+		// close the server side sender
+		close(ch1)
+	}()
 	return wclient, nil
 	return wclient, nil
 }
 }
 
 
@@ -88,7 +92,7 @@ func (ss *chanServerStream) SetTrailer(md metadata.MD) {
 type chanClientStream struct {
 type chanClientStream struct {
 	headerc  <-chan metadata.MD
 	headerc  <-chan metadata.MD
 	trailerc <-chan metadata.MD
 	trailerc <-chan metadata.MD
-	grpc.Stream
+	*chanStream
 }
 }
 
 
 func (cs *chanClientStream) Header() (metadata.MD, error) {
 func (cs *chanClientStream) Header() (metadata.MD, error) {
@@ -109,7 +113,10 @@ func (cs *chanClientStream) Trailer() metadata.MD {
 	}
 	}
 }
 }
 
 
-func (s *chanClientStream) CloseSend() error { return nil }
+func (s *chanClientStream) CloseSend() error {
+	close(s.chanStream.sendc)
+	return nil
+}
 
 
 // chanStream implements grpc.Stream using channels
 // chanStream implements grpc.Stream using channels
 type chanStream struct {
 type chanStream struct {
@@ -132,8 +139,11 @@ func (s *chanStream) SendMsg(m interface{}) error {
 func (s *chanStream) RecvMsg(m interface{}) error {
 func (s *chanStream) RecvMsg(m interface{}) error {
 	v := m.(*interface{})
 	v := m.(*interface{})
 	select {
 	select {
-	case m = <-s.recvc:
-		*v = m
+	case msg, ok := <-s.recvc:
+		if !ok {
+			return grpc.ErrClientConnClosing
+		}
+		*v = msg
 		return nil
 		return nil
 	case <-s.ctx.Done():
 	case <-s.ctx.Done():
 	}
 	}