Browse Source

Merge pull request #7221 from fanminshi/grpcproxy_support_lease_coalescing

grpcproxy: support lease coalescing
fanmin shi 8 years ago
parent
commit
a5cf7fdc87

+ 6 - 3
clientv3/lease.go

@@ -144,16 +144,19 @@ type keepAlive struct {
 }
 
 func NewLease(c *Client) Lease {
+	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
+}
+
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
 	l := &lessor{
 		donec:                 make(chan struct{}),
 		keepAlives:            make(map[LeaseID]*keepAlive),
-		remote:                RetryLeaseClient(c),
-		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
+		remote:                remote,
+		firstKeepAliveTimeout: keepAliveTimeout,
 	}
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
 	}
-
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 	return l
 }

+ 1 - 1
etcdmain/grpc_proxy.go

@@ -106,7 +106,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	kvp, _ := grpcproxy.NewKvProxy(client)
 	watchp, _ := grpcproxy.NewWatchProxy(client)
 	clusterp := grpcproxy.NewClusterProxy(client)
-	leasep := grpcproxy.NewLeaseProxy(client)
+	leasep, _ := grpcproxy.NewLeaseProxy(client)
 	mainp := grpcproxy.NewMaintenanceProxy(client)
 	authp := grpcproxy.NewAuthProxy(client)
 

+ 10 - 5
integration/cluster_proxy.go

@@ -33,6 +33,7 @@ type grpcClientProxy struct {
 	grpc    grpcAPI
 	wdonec  <-chan struct{}
 	kvdonec <-chan struct{}
+	lpdonec <-chan struct{}
 }
 
 func toGRPC(c *clientv3.Client) grpcAPI {
@@ -42,18 +43,18 @@ func toGRPC(c *clientv3.Client) grpcAPI {
 	if v, ok := proxies[c]; ok {
 		return v.grpc
 	}
-
-	wp, wpch := grpcproxy.NewWatchProxy(c)
 	kvp, kvpch := grpcproxy.NewKvProxy(c)
+	wp, wpch := grpcproxy.NewWatchProxy(c)
+	lp, lpch := grpcproxy.NewLeaseProxy(c)
 	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
 		grpcproxy.KvServerToKvClient(kvp),
-		pb.NewLeaseClient(c.ActiveConnection()),
+		grpcproxy.LeaseServerToLeaseClient(lp),
 		grpcproxy.WatchServerToWatchClient(wp),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 		pb.NewAuthClient(c.ActiveConnection()),
 	}
-	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
+	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}
 	return grpc
 }
 
@@ -61,13 +62,15 @@ type proxyCloser struct {
 	clientv3.Watcher
 	wdonec  <-chan struct{}
 	kvdonec <-chan struct{}
+	lpdonec <-chan struct{}
 }
 
 func (pc *proxyCloser) Close() error {
-	// client ctx is canceled before calling close, so kv will close out
+	// client ctx is canceled before calling close, so kv and lp will close out
 	<-pc.kvdonec
 	err := pc.Watcher.Close()
 	<-pc.wdonec
+	<-pc.lpdonec
 	return err
 }
 
@@ -79,10 +82,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	rpc := toGRPC(c)
 	c.KV = clientv3.NewKVFromKVClient(rpc.KV)
 	pmu.Lock()
+	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
 	c.Watcher = &proxyCloser{
 		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
 		wdonec:  proxies[c].wdonec,
 		kvdonec: proxies[c].kvdonec,
+		lpdonec: proxies[c].lpdonec,
 	}
 	pmu.Unlock()
 	return c, nil

+ 132 - 0
proxy/grpcproxy/chan_stream.go

@@ -0,0 +1,132 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"golang.org/x/net/context"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+// chanServerStream implements grpc.ServerStream with a chanStream
+type chanServerStream struct {
+	headerc  chan<- metadata.MD
+	trailerc chan<- metadata.MD
+	grpc.Stream
+
+	headers []metadata.MD
+}
+
+func (ss *chanServerStream) SendHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	outmd := make(map[string][]string)
+	for _, h := range append(ss.headers, md) {
+		for k, v := range h {
+			outmd[k] = v
+		}
+	}
+	select {
+	case ss.headerc <- outmd:
+		ss.headerc = nil
+		ss.headers = nil
+		return nil
+	case <-ss.Context().Done():
+	}
+	return ss.Context().Err()
+}
+
+func (ss *chanServerStream) SetHeader(md metadata.MD) error {
+	if ss.headerc == nil {
+		return errAlreadySentHeader
+	}
+	ss.headers = append(ss.headers, md)
+	return nil
+}
+
+func (ss *chanServerStream) SetTrailer(md metadata.MD) {
+	ss.trailerc <- md
+}
+
+// chanClientStream implements grpc.ClientStream with a chanStream
+type chanClientStream struct {
+	headerc  <-chan metadata.MD
+	trailerc <-chan metadata.MD
+	*chanStream
+}
+
+func (cs *chanClientStream) Header() (metadata.MD, error) {
+	select {
+	case md := <-cs.headerc:
+		return md, nil
+	case <-cs.Context().Done():
+	}
+	return nil, cs.Context().Err()
+}
+
+func (cs *chanClientStream) Trailer() metadata.MD {
+	select {
+	case md := <-cs.trailerc:
+		return md
+	case <-cs.Context().Done():
+		return nil
+	}
+}
+
+func (cs *chanClientStream) CloseSend() error {
+	close(cs.chanStream.sendc)
+	return nil
+}
+
+// chanStream implements grpc.Stream using channels
+type chanStream struct {
+	recvc  <-chan interface{}
+	sendc  chan<- interface{}
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func (s *chanStream) Context() context.Context { return s.ctx }
+
+func (s *chanStream) SendMsg(m interface{}) error {
+	select {
+	case s.sendc <- m:
+		if err, ok := m.(error); ok {
+			return err
+		}
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}
+
+func (s *chanStream) RecvMsg(m interface{}) error {
+	v := m.(*interface{})
+	select {
+	case msg, ok := <-s.recvc:
+		if !ok {
+			return grpc.ErrClientConnClosing
+		}
+		if err, ok := msg.(error); ok {
+			return err
+		}
+		*v = msg
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}

+ 313 - 33
proxy/grpcproxy/lease.go

@@ -15,73 +15,353 @@
 package grpcproxy
 
 import (
+	"io"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+
 	"golang.org/x/net/context"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 type leaseProxy struct {
-	client *clientv3.Client
+	// leaseClient handles req from LeaseGrant() that requires a lease ID.
+	leaseClient pb.LeaseClient
+
+	lessor clientv3.Lease
+
+	ctx context.Context
+
+	leader *leader
+
+	// mu protects adding outstanding leaseProxyStream through wg.
+	mu sync.RWMutex
+
+	// wg waits until all outstanding leaseProxyStream quit.
+	wg sync.WaitGroup
 }
 
-func NewLeaseProxy(c *clientv3.Client) pb.LeaseServer {
-	return &leaseProxy{
-		client: c,
+func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
+	cctx, cancel := context.WithCancel(c.Ctx())
+	lp := &leaseProxy{
+		leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
+		lessor:      c.Lease,
+		ctx:         cctx,
+		leader:      newLeader(c.Ctx(), c.Watcher),
 	}
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		<-lp.leader.stopNotify()
+		lp.mu.Lock()
+		select {
+		case <-lp.ctx.Done():
+		case <-lp.leader.disconnectNotify():
+			cancel()
+		}
+		<-lp.ctx.Done()
+		lp.mu.Unlock()
+		lp.wg.Wait()
+	}()
+	return lp, ch
 }
 
 func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseGrant(ctx, cr)
+	rp, err := lp.leaseClient.LeaseGrant(ctx, cr)
+	if err != nil {
+		return nil, err
+	}
+	lp.leader.gotLeader()
+	return rp, nil
 }
 
 func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseRevoke(ctx, rr)
+	r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
+	if err != nil {
+		return nil, err
+	}
+	lp.leader.gotLeader()
+	return (*pb.LeaseRevokeResponse)(r), nil
 }
 
 func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
-	conn := lp.client.ActiveConnection()
-	return pb.NewLeaseClient(conn).LeaseTimeToLive(ctx, rr)
+	var (
+		r   *clientv3.LeaseTimeToLiveResponse
+		err error
+	)
+	if rr.Keys {
+		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
+	} else {
+		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
+	}
+	if err != nil {
+		return nil, err
+	}
+	rp := &pb.LeaseTimeToLiveResponse{
+		Header:     r.ResponseHeader,
+		ID:         int64(r.ID),
+		TTL:        r.TTL,
+		GrantedTTL: r.GrantedTTL,
+		Keys:       r.Keys,
+	}
+	return rp, err
 }
 
 func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
-	conn := lp.client.ActiveConnection()
+	lp.mu.Lock()
+	select {
+	case <-lp.ctx.Done():
+		lp.mu.Unlock()
+		return lp.ctx.Err()
+	default:
+		lp.wg.Add(1)
+	}
+	lp.mu.Unlock()
+
 	ctx, cancel := context.WithCancel(stream.Context())
-	lc, err := pb.NewLeaseClient(conn).LeaseKeepAlive(ctx)
-	if err != nil {
-		cancel()
-		return err
+	lps := leaseProxyStream{
+		stream:          stream,
+		lessor:          lp.lessor,
+		keepAliveLeases: make(map[int64]*atomicCounter),
+		respc:           make(chan *pb.LeaseKeepAliveResponse),
+		ctx:             ctx,
+		cancel:          cancel,
 	}
 
-	go func() {
-		// Cancel the context attached to lc to unblock lc.Recv when
-		// this routine returns on error.
-		defer cancel()
-
-		for {
-			// stream.Recv will be unblock when the loop in the parent routine
-			// returns on error.
-			rr, err := stream.Recv()
-			if err != nil {
-				return
-			}
-			err = lc.Send(rr)
-			if err != nil {
-				return
+	errc := make(chan error, 2)
+
+	var lostLeaderC <-chan struct{}
+	if md, ok := metadata.FromContext(stream.Context()); ok {
+		v := md[rpctypes.MetadataRequireLeaderKey]
+		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+			lostLeaderC = lp.leader.lostNotify()
+			// if leader is known to be lost at creation time, avoid
+			// letting events through at all
+			select {
+			case <-lostLeaderC:
+				lp.wg.Done()
+				return rpctypes.ErrNoLeader
+			default:
 			}
 		}
+	}
+	stopc := make(chan struct{}, 3)
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		if err := lps.recvLoop(); err != nil {
+			errc <- err
+		}
 	}()
 
-	for {
-		rr, err := lc.Recv()
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		if err := lps.sendLoop(); err != nil {
+			errc <- err
+		}
+	}()
+
+	// tears down LeaseKeepAlive stream if leader goes down or entire leaseProxy is terminated.
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		select {
+		case <-lostLeaderC:
+		case <-ctx.Done():
+		case <-lp.ctx.Done():
+		}
+	}()
+
+	var err error
+	select {
+	case <-stopc:
+		stopc <- struct{}{}
+	case err = <-errc:
+	}
+	cancel()
+
+	// recv/send may only shutdown after function exits;
+	// this goroutine notifies lease proxy that the stream is through
+	go func() {
+		<-stopc
+		<-stopc
+		<-stopc
+		lps.close()
+		close(errc)
+		lp.wg.Done()
+	}()
+
+	select {
+	case <-lostLeaderC:
+		return rpctypes.ErrNoLeader
+	case <-lp.leader.disconnectNotify():
+		return grpc.ErrClientConnClosing
+	default:
 		if err != nil {
 			return err
 		}
-		err = stream.Send(rr)
+		return ctx.Err()
+	}
+}
+
+type leaseProxyStream struct {
+	stream pb.Lease_LeaseKeepAliveServer
+
+	lessor clientv3.Lease
+	// wg tracks keepAliveLoop goroutines
+	wg sync.WaitGroup
+	// mu protects keepAliveLeases
+	mu sync.RWMutex
+	// keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
+	keepAliveLeases map[int64]*atomicCounter
+	// respc receives lease keepalive responses from etcd backend
+	respc chan *pb.LeaseKeepAliveResponse
+
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func (lps *leaseProxyStream) recvLoop() error {
+	for {
+		rr, err := lps.stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
 		if err != nil {
 			return err
 		}
+		lps.mu.Lock()
+		neededResps, ok := lps.keepAliveLeases[rr.ID]
+		if !ok {
+			neededResps = &atomicCounter{}
+			lps.keepAliveLeases[rr.ID] = neededResps
+			lps.wg.Add(1)
+			go func() {
+				defer lps.wg.Done()
+				if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
+					lps.cancel()
+				}
+			}()
+		}
+		neededResps.add(1)
+		lps.mu.Unlock()
 	}
 }
+
+func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
+	cctx, ccancel := context.WithCancel(lps.ctx)
+	defer ccancel()
+	respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
+	if err != nil {
+		return err
+	}
+	// ticker expires when loop hasn't received keepalive within TTL
+	var ticker <-chan time.Time
+	for {
+		select {
+		case <-ticker:
+			lps.mu.Lock()
+			// if there are outstanding keepAlive reqs at the moment of ticker firing,
+			// don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs.
+			if neededResps.get() > 0 {
+				lps.mu.Unlock()
+				ticker = nil
+				continue
+			}
+			delete(lps.keepAliveLeases, leaseID)
+			lps.mu.Unlock()
+			return nil
+		case rp, ok := <-respc:
+			if !ok {
+				lps.mu.Lock()
+				delete(lps.keepAliveLeases, leaseID)
+				lps.mu.Unlock()
+				if neededResps.get() == 0 {
+					return nil
+				}
+				ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
+				if err != nil {
+					return err
+				}
+				r := &pb.LeaseKeepAliveResponse{
+					Header: ttlResp.ResponseHeader,
+					ID:     int64(ttlResp.ID),
+					TTL:    ttlResp.TTL,
+				}
+				for neededResps.get() > 0 {
+					select {
+					case lps.respc <- r:
+						neededResps.add(-1)
+					case <-lps.ctx.Done():
+						return nil
+					}
+				}
+				return nil
+			}
+			if neededResps.get() == 0 {
+				continue
+			}
+			ticker = time.After(time.Duration(rp.TTL) * time.Second)
+			r := &pb.LeaseKeepAliveResponse{
+				Header: rp.ResponseHeader,
+				ID:     int64(rp.ID),
+				TTL:    rp.TTL,
+			}
+			lps.replyToClient(r, neededResps)
+		}
+	}
+}
+
+func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
+	timer := time.After(500 * time.Millisecond)
+	for neededResps.get() > 0 {
+		select {
+		case lps.respc <- r:
+			neededResps.add(-1)
+		case <-timer:
+			return
+		case <-lps.ctx.Done():
+			return
+		}
+	}
+}
+
+func (lps *leaseProxyStream) sendLoop() error {
+	for {
+		select {
+		case lrp, ok := <-lps.respc:
+			if !ok {
+				return nil
+			}
+			if err := lps.stream.Send(lrp); err != nil {
+				return err
+			}
+		case <-lps.ctx.Done():
+			return lps.ctx.Err()
+		}
+	}
+}
+
+func (lps *leaseProxyStream) close() {
+	lps.cancel()
+	lps.wg.Wait()
+	// only close respc channel if all the keepAliveLoop() goroutines have finished
+	// this ensures those goroutines don't send resp to a closed resp channel
+	close(lps.respc)
+}
+
+type atomicCounter struct {
+	counter int64
+}
+
+func (ac *atomicCounter) add(delta int64) {
+	atomic.AddInt64(&ac.counter, delta)
+}
+
+func (ac *atomicCounter) get() int64 {
+	return atomic.LoadInt64(&ac.counter)
+}

+ 97 - 0
proxy/grpcproxy/lease_client_adapter.go

@@ -0,0 +1,97 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"golang.org/x/net/context"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+type ls2lc struct {
+	leaseServer pb.LeaseServer
+}
+
+func LeaseServerToLeaseClient(ls pb.LeaseServer) pb.LeaseClient {
+	return &ls2lc{ls}
+}
+
+func (c *ls2lc) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (*pb.LeaseGrantResponse, error) {
+	return c.leaseServer.LeaseGrant(ctx, in)
+}
+
+func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (*pb.LeaseRevokeResponse, error) {
+	return c.leaseServer.LeaseRevoke(ctx, in)
+}
+
+func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
+	// ch1 is buffered so server can send error on close
+	ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
+	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
+
+	cctx, ccancel := context.WithCancel(ctx)
+	cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
+	lclient := &ls2lcClientStream{chanClientStream{headerc, trailerc, cli}}
+
+	sctx, scancel := context.WithCancel(ctx)
+	srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
+	lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}}
+	go func() {
+		if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil {
+			select {
+			case srv.sendc <- err:
+			case <-sctx.Done():
+			case <-cctx.Done():
+			}
+		}
+		scancel()
+		ccancel()
+	}()
+	return lclient, nil
+}
+
+func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {
+	return c.leaseServer.LeaseTimeToLive(ctx, in)
+}
+
+// ls2lcClientStream implements Lease_LeaseKeepAliveClient
+type ls2lcClientStream struct{ chanClientStream }
+
+// ls2lcServerStream implements Lease_LeaseKeepAliveServer
+type ls2lcServerStream struct{ chanServerStream }
+
+func (s *ls2lcClientStream) Send(rr *pb.LeaseKeepAliveRequest) error {
+	return s.SendMsg(rr)
+}
+func (s *ls2lcClientStream) Recv() (*pb.LeaseKeepAliveResponse, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.LeaseKeepAliveResponse), nil
+}
+
+func (s *ls2lcServerStream) Send(rr *pb.LeaseKeepAliveResponse) error {
+	return s.SendMsg(rr)
+}
+func (s *ls2lcServerStream) Recv() (*pb.LeaseKeepAliveRequest, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.LeaseKeepAliveRequest), nil
+}

+ 0 - 110
proxy/grpcproxy/watch_client_adapter.go

@@ -84,113 +84,3 @@ func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
 	}
 	return v.(*pb.WatchRequest), nil
 }
-
-// chanServerStream implements grpc.ServerStream with a chanStream
-type chanServerStream struct {
-	headerc  chan<- metadata.MD
-	trailerc chan<- metadata.MD
-	grpc.Stream
-
-	headers []metadata.MD
-}
-
-func (ss *chanServerStream) SendHeader(md metadata.MD) error {
-	if ss.headerc == nil {
-		return errAlreadySentHeader
-	}
-	outmd := make(map[string][]string)
-	for _, h := range append(ss.headers, md) {
-		for k, v := range h {
-			outmd[k] = v
-		}
-	}
-	select {
-	case ss.headerc <- outmd:
-		ss.headerc = nil
-		ss.headers = nil
-		return nil
-	case <-ss.Context().Done():
-	}
-	return ss.Context().Err()
-}
-
-func (ss *chanServerStream) SetHeader(md metadata.MD) error {
-	if ss.headerc == nil {
-		return errAlreadySentHeader
-	}
-	ss.headers = append(ss.headers, md)
-	return nil
-}
-
-func (ss *chanServerStream) SetTrailer(md metadata.MD) {
-	ss.trailerc <- md
-}
-
-// chanClientStream implements grpc.ClientStream with a chanStream
-type chanClientStream struct {
-	headerc  <-chan metadata.MD
-	trailerc <-chan metadata.MD
-	*chanStream
-}
-
-func (cs *chanClientStream) Header() (metadata.MD, error) {
-	select {
-	case md := <-cs.headerc:
-		return md, nil
-	case <-cs.Context().Done():
-	}
-	return nil, cs.Context().Err()
-}
-
-func (cs *chanClientStream) Trailer() metadata.MD {
-	select {
-	case md := <-cs.trailerc:
-		return md
-	case <-cs.Context().Done():
-		return nil
-	}
-}
-
-func (s *chanClientStream) CloseSend() error {
-	close(s.chanStream.sendc)
-	return nil
-}
-
-// chanStream implements grpc.Stream using channels
-type chanStream struct {
-	recvc  <-chan interface{}
-	sendc  chan<- interface{}
-	ctx    context.Context
-	cancel context.CancelFunc
-}
-
-func (s *chanStream) Context() context.Context { return s.ctx }
-
-func (s *chanStream) SendMsg(m interface{}) error {
-	select {
-	case s.sendc <- m:
-		if err, ok := m.(error); ok {
-			return err
-		}
-		return nil
-	case <-s.ctx.Done():
-	}
-	return s.ctx.Err()
-}
-
-func (s *chanStream) RecvMsg(m interface{}) error {
-	v := m.(*interface{})
-	select {
-	case msg, ok := <-s.recvc:
-		if !ok {
-			return grpc.ErrClientConnClosing
-		}
-		if err, ok := msg.(error); ok {
-			return err
-		}
-		*v = msg
-		return nil
-	case <-s.ctx.Done():
-	}
-	return s.ctx.Err()
-}