Browse Source

proxy/grpcproxy: add chanStream helper

Prelimiary work for maintenance API in adapter

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
caa73c176f

+ 27 - 0
proxy/grpcproxy/adapter/chan_stream.go

@@ -136,3 +136,30 @@ func (s *chanStream) RecvMsg(m interface{}) error {
 	}
 	}
 	return s.ctx.Err()
 	return s.ctx.Err()
 }
 }
+
+func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream {
+	// ch1 is buffered so server can send error on close
+	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
+	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
+
+	cctx, ccancel := context.WithCancel(ctx)
+	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
+	cs := chanClientStream{headerc, trailerc, cli}
+
+	sctx, scancel := context.WithCancel(ctx)
+	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
+	ss := chanServerStream{headerc, trailerc, srv, nil}
+
+	go func() {
+		if err := ssHandler(ss); err != nil {
+			select {
+			case srv.sendc <- err:
+			case <-sctx.Done():
+			case <-cctx.Done():
+			}
+		}
+		scancel()
+		ccancel()
+	}()
+	return cs
+}

+ 4 - 24
proxy/grpcproxy/adapter/lease_client_adapter.go

@@ -19,7 +19,6 @@ import (
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/metadata"
 )
 )
 
 
 type ls2lc struct {
 type ls2lc struct {
@@ -39,29 +38,10 @@ func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts
 }
 }
 
 
 func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
 func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
-	// ch1 is buffered so server can send error on close
-	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
-	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
-
-	cctx, ccancel := context.WithCancel(ctx)
-	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
-	lclient := &ls2lcClientStream{chanClientStream{headerc, trailerc, cli}}
-
-	sctx, scancel := context.WithCancel(ctx)
-	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
-	lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
-	go func() {
-		if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil {
-			select {
-			case srv.sendc <- err:
-			case <-sctx.Done():
-			case <-cctx.Done():
-			}
-		}
-		scancel()
-		ccancel()
-	}()
-	return lclient, nil
+	cs := newPipeStream(ctx, func(ss chanServerStream) error {
+		return c.leaseServer.LeaseKeepAlive(&ls2lcServerStream{ss})
+	})
+	return &ls2lcClientStream{cs}, nil
 }
 }
 
 
 func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {
 func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {

+ 4 - 24
proxy/grpcproxy/adapter/watch_client_adapter.go

@@ -20,7 +20,6 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/metadata"
 )
 )
 
 
 var errAlreadySentHeader = errors.New("adapter: already sent header")
 var errAlreadySentHeader = errors.New("adapter: already sent header")
@@ -32,29 +31,10 @@ func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
 }
 }
 
 
 func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
 func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
-	// ch1 is buffered so server can send error on close
-	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
-	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
-
-	cctx, ccancel := context.WithCancel(ctx)
-	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
-	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, cli}}
-
-	sctx, scancel := context.WithCancel(ctx)
-	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
-	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
-	go func() {
-		if err := s.wserv.Watch(wserver); err != nil {
-			select {
-			case srv.sendc <- err:
-			case <-sctx.Done():
-			case <-cctx.Done():
-			}
-		}
-		scancel()
-		ccancel()
-	}()
-	return wclient, nil
+	cs := newPipeStream(ctx, func(ss chanServerStream) error {
+		return s.wserv.Watch(&ws2wcServerStream{ss})
+	})
+	return &ws2wcClientStream{cs}, nil
 }
 }
 
 
 // ws2wcClientStream implements Watch_WatchClient
 // ws2wcClientStream implements Watch_WatchClient