Browse Source

grpcproxy: refactor chan stream out of watch_client_adapter

fanmin shi 8 years ago
parent
commit
05b82f2022
2 changed files with 132 additions and 110 deletions
  1. 132 0
      proxy/grpcproxy/chan_stream.go
  2. 0 110
      proxy/grpcproxy/watch_client_adapter.go

+ 132 - 0
proxy/grpcproxy/chan_stream.go

@@ -0,0 +1,132 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"golang.org/x/net/context"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+// chanServerStream implements grpc.ServerStream with a chanStream
+type chanServerStream struct {
+	headerc  chan<- metadata.MD
+	trailerc chan<- metadata.MD
+	grpc.Stream
+
+	headers []metadata.MD
+}
+
+func (ss *chanServerStream) SendHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	outmd := make(map[string][]string)
+	for _, h := range append(ss.headers, md) {
+		for k, v := range h {
+			outmd[k] = v
+		}
+	}
+	select {
+	case ss.headerc <- outmd:
+		ss.headerc = nil
+		ss.headers = nil
+		return nil
+	case <-ss.Context().Done():
+	}
+	return ss.Context().Err()
+}
+
+func (ss *chanServerStream) SetHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	ss.headers = append(ss.headers, md)
+	return nil
+}
+
+func (ss *chanServerStream) SetTrailer(md metadata.MD) {
+	ss.trailerc <- md
+}
+
+// chanClientStream implements grpc.ClientStream with a chanStream
+type chanClientStream struct {
+	headerc  <-chan metadata.MD
+	trailerc <-chan metadata.MD
+	*chanStream
+}
+
+func (cs *chanClientStream) Header() (metadata.MD, error) {
+	select {
+	case md := <-cs.headerc:
+		return md, nil
+	case <-cs.Context().Done():
+	}
+	return nil, cs.Context().Err()
+}
+
+func (cs *chanClientStream) Trailer() metadata.MD {
+	select {
+	case md := <-cs.trailerc:
+		return md
+	case <-cs.Context().Done():
+		return nil
+	}
+}
+
+func (cs *chanClientStream) CloseSend() error {
+	close(cs.chanStream.sendc)
+	return nil
+}
+
+// chanStream implements grpc.Stream using channels
+type chanStream struct {
+	recvc  <-chan interface{}
+	sendc  chan<- interface{}
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func (s *chanStream) Context() context.Context { return s.ctx }
+
+func (s *chanStream) SendMsg(m interface{}) error {
+	select {
+	case s.sendc <- m:
+		if err, ok := m.(error); ok {
+			return err
+		}
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}
+
+func (s *chanStream) RecvMsg(m interface{}) error {
+	v := m.(*interface{})
+	select {
+	case msg, ok := <-s.recvc:
+		if !ok {
+			return grpc.ErrClientConnClosing
+		}
+		if err, ok := msg.(error); ok {
+			return err
+		}
+		*v = msg
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}

+ 0 - 110
proxy/grpcproxy/watch_client_adapter.go

@@ -84,113 +84,3 @@ func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
 	}
 	}
 	return v.(*pb.WatchRequest), nil
 	return v.(*pb.WatchRequest), nil
 }
 }
-
-// chanServerStream implements grpc.ServerStream with a chanStream
-type chanServerStream struct {
-	headerc  chan<- metadata.MD
-	trailerc chan<- metadata.MD
-	grpc.Stream
-
-	headers []metadata.MD
-}
-
-func (ss *chanServerStream) SendHeader(md metadata.MD) error {
-	if ss.headerc == nil {
-		return errAlreadySentHeader
-	}
-	outmd := make(map[string][]string)
-	for _, h := range append(ss.headers, md) {
-		for k, v := range h {
-			outmd[k] = v
-		}
-	}
-	select {
-	case ss.headerc <- outmd:
-		ss.headerc = nil
-		ss.headers = nil
-		return nil
-	case <-ss.Context().Done():
-	}
-	return ss.Context().Err()
-}
-
-func (ss *chanServerStream) SetHeader(md metadata.MD) error {
-	if ss.headerc == nil {
-		return errAlreadySentHeader
-	}
-	ss.headers = append(ss.headers, md)
-	return nil
-}
-
-func (ss *chanServerStream) SetTrailer(md metadata.MD) {
-	ss.trailerc <- md
-}
-
-// chanClientStream implements grpc.ClientStream with a chanStream
-type chanClientStream struct {
-	headerc  <-chan metadata.MD
-	trailerc <-chan metadata.MD
-	*chanStream
-}
-
-func (cs *chanClientStream) Header() (metadata.MD, error) {
-	select {
-	case md := <-cs.headerc:
-		return md, nil
-	case <-cs.Context().Done():
-	}
-	return nil, cs.Context().Err()
-}
-
-func (cs *chanClientStream) Trailer() metadata.MD {
-	select {
-	case md := <-cs.trailerc:
-		return md
-	case <-cs.Context().Done():
-		return nil
-	}
-}
-
-func (s *chanClientStream) CloseSend() error {
-	close(s.chanStream.sendc)
-	return nil
-}
-
-// chanStream implements grpc.Stream using channels
-type chanStream struct {
-	recvc  <-chan interface{}
-	sendc  chan<- interface{}
-	ctx    context.Context
-	cancel context.CancelFunc
-}
-
-func (s *chanStream) Context() context.Context { return s.ctx }
-
-func (s *chanStream) SendMsg(m interface{}) error {
-	select {
-	case s.sendc <- m:
-		if err, ok := m.(error); ok {
-			return err
-		}
-		return nil
-	case <-s.ctx.Done():
-	}
-	return s.ctx.Err()
-}
-
-func (s *chanStream) RecvMsg(m interface{}) error {
-	v := m.(*interface{})
-	select {
-	case msg, ok := <-s.recvc:
-		if !ok {
-			return grpc.ErrClientConnClosing
-		}
-		if err, ok := msg.(error); ok {
-			return err
-		}
-		*v = msg
-		return nil
-	case <-s.ctx.Done():
-	}
-	return s.ctx.Err()
-}