Browse Source

Merge pull request #6033 from heyitsanthony/watch-adapter

integration: support watch with cluster_proxy tag
Anthony Romano 9 years ago
parent
commit
537057bd11

+ 5 - 1
clientv3/watch.go

@@ -164,8 +164,12 @@ type watcherStream struct {
 }
 }
 
 
 func NewWatcher(c *Client) Watcher {
 func NewWatcher(c *Client) Watcher {
+	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
+}
+
+func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
 	return &watcher{
 	return &watcher{
-		remote:  pb.NewWatchClient(c.conn),
+		remote:  wc,
 		streams: make(map[string]*watchGrpcStream),
 		streams: make(map[string]*watchGrpcStream),
 	}
 	}
 }
 }

+ 9 - 1
integration/cluster_proxy.go

@@ -22,12 +22,17 @@ import (
 	"github.com/coreos/etcd/proxy/grpcproxy"
 	"github.com/coreos/etcd/proxy/grpcproxy"
 )
 )
 
 
+var proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI)
+
 func toGRPC(c *clientv3.Client) grpcAPI {
 func toGRPC(c *clientv3.Client) grpcAPI {
+	if v, ok := proxies[c]; ok {
+		return v
+	}
 	return grpcAPI{
 	return grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
 		pb.NewClusterClient(c.ActiveConnection()),
 		grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
 		grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
 		pb.NewLeaseClient(c.ActiveConnection()),
 		pb.NewLeaseClient(c.ActiveConnection()),
-		pb.NewWatchClient(c.ActiveConnection()),
+		grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 	}
 	}
 }
 }
@@ -37,6 +42,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
+	proxies[c] = toGRPC(c)
 	c.KV = clientv3.NewKVFromKVClient(grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)))
 	c.KV = clientv3.NewKVFromKVClient(grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)))
+	c.Watcher = clientv3.NewWatchFromWatchClient(grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)))
 	return c, nil
 	return c, nil
 }
 }

+ 17 - 6
proxy/grpcproxy/watch.go

@@ -26,7 +26,7 @@ import (
 )
 )
 
 
 type watchProxy struct {
 type watchProxy struct {
-	c   *clientv3.Client
+	cw  clientv3.Watcher
 	wgs watchergroups
 	wgs watchergroups
 
 
 	mu           sync.Mutex
 	mu           sync.Mutex
@@ -35,9 +35,9 @@ type watchProxy struct {
 
 
 func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 	return &watchProxy{
 	return &watchProxy{
-		c: c,
+		cw: c.Watcher,
 		wgs: watchergroups{
 		wgs: watchergroups{
-			c:      c,
+			cw:     c.Watcher,
 			groups: make(map[watchRange]*watcherGroup),
 			groups: make(map[watchRange]*watcherGroup),
 		},
 		},
 	}
 	}
@@ -49,7 +49,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 	wp.mu.Unlock()
 	wp.mu.Unlock()
 
 
 	sws := serverWatchStream{
 	sws := serverWatchStream{
-		c:      wp.c,
+		cw:     wp.cw,
 		groups: &wp.wgs,
 		groups: &wp.wgs,
 
 
 		id:         wp.nextStreamID,
 		id:         wp.nextStreamID,
@@ -68,7 +68,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 
 
 type serverWatchStream struct {
 type serverWatchStream struct {
 	id int64
 	id int64
-	c  *clientv3.Client
+	cw clientv3.Watcher
 
 
 	mu      sync.Mutex // make sure any access of groups and singles is atomic
 	mu      sync.Mutex // make sure any access of groups and singles is atomic
 	groups  *watchergroups
 	groups  *watchergroups
@@ -82,7 +82,18 @@ type serverWatchStream struct {
 	nextWatcherID int64
 	nextWatcherID int64
 }
 }
 
 
+func (sws *serverWatchStream) close() {
+	close(sws.watchCh)
+	close(sws.ctrlCh)
+	for _, ws := range sws.singles {
+		ws.stop()
+	}
+	sws.groups.stop()
+}
+
 func (sws *serverWatchStream) recvLoop() error {
 func (sws *serverWatchStream) recvLoop() error {
+	defer sws.close()
+
 	for {
 	for {
 		req, err := sws.gRPCStream.Recv()
 		req, err := sws.gRPCStream.Recv()
 		if err == io.EOF {
 		if err == io.EOF {
@@ -170,7 +181,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 
 
-	wch := sws.c.Watch(ctx,
+	wch := sws.cw.Watch(ctx,
 		w.wr.key, clientv3.WithRange(w.wr.end),
 		w.wr.key, clientv3.WithRange(w.wr.end),
 		clientv3.WithRev(rev),
 		clientv3.WithRev(rev),
 		clientv3.WithProgressNotify(),
 		clientv3.WithProgressNotify(),

+ 141 - 0
proxy/grpcproxy/watch_client_adapter.go

@@ -0,0 +1,141 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+type ws2wc struct{ wserv pb.WatchServer }
+
+func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
+	return &ws2wc{wserv}
+}
+
+func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
+	ch1, ch2 := make(chan interface{}), make(chan interface{})
+	headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
+	wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}}
+	wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}}
+	go s.wserv.Watch(wserver)
+	return wclient, nil
+}
+
+// ws2wcClientStream implements Watch_WatchClient
+type ws2wcClientStream struct{ chanClientStream }
+
+// ws2wcServerStream implements Watch_WatchServer
+type ws2wcServerStream struct{ chanServerStream }
+
+func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error {
+	return s.SendMsg(wr)
+}
+func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.WatchResponse), nil
+}
+
+func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error {
+	return s.SendMsg(wr)
+}
+func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.WatchRequest), nil
+}
+
+// chanServerStream implements grpc.ServerStream with a chanStream
+type chanServerStream struct {
+	headerc  chan<- metadata.MD
+	trailerc chan<- metadata.MD
+	grpc.Stream
+}
+
+func (ss *chanServerStream) SendHeader(md metadata.MD) error {
+	select {
+	case ss.headerc <- md:
+		return nil
+	case <-ss.Context().Done():
+	}
+	return ss.Context().Err()
+}
+
+func (ss *chanServerStream) SetTrailer(md metadata.MD) {
+	ss.trailerc <- md
+}
+
+// chanClientStream implements grpc.ClientStream with a chanStream
+type chanClientStream struct {
+	headerc  <-chan metadata.MD
+	trailerc <-chan metadata.MD
+	grpc.Stream
+}
+
+func (cs *chanClientStream) Header() (metadata.MD, error) {
+	select {
+	case md := <-cs.headerc:
+		return md, nil
+	case <-cs.Context().Done():
+	}
+	return nil, cs.Context().Err()
+}
+
+func (cs *chanClientStream) Trailer() metadata.MD {
+	select {
+	case md := <-cs.trailerc:
+		return md
+	case <-cs.Context().Done():
+		return nil
+	}
+}
+
+func (s *chanClientStream) CloseSend() error { return nil }
+
+// chanStream implements grpc.Stream using channels
+type chanStream struct {
+	recvc <-chan interface{}
+	sendc chan<- interface{}
+	ctx   context.Context
+}
+
+func (s *chanStream) Context() context.Context { return s.Context() }
+
+func (s *chanStream) SendMsg(m interface{}) error {
+	select {
+	case s.sendc <- m:
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}
+
+func (s *chanStream) RecvMsg(m interface{}) error {
+	v := m.(*interface{})
+	select {
+	case m = <-s.recvc:
+		*v = m
+		return nil
+	case <-s.ctx.Done():
+	}
+	return s.ctx.Err()
+}

+ 10 - 2
proxy/grpcproxy/watcher_groups.go

@@ -22,7 +22,7 @@ import (
 )
 )
 
 
 type watchergroups struct {
 type watchergroups struct {
-	c *clientv3.Client
+	cw clientv3.Watcher
 
 
 	mu        sync.Mutex
 	mu        sync.Mutex
 	groups    map[watchRange]*watcherGroup
 	groups    map[watchRange]*watcherGroup
@@ -42,7 +42,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 
 
-	wch := wgs.c.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
+	wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
 	watchg := newWatchergroup(wch, cancel)
 	watchg := newWatchergroup(wch, cancel)
 	watchg.add(rid, w)
 	watchg.add(rid, w)
 	go watchg.run()
 	go watchg.run()
@@ -86,3 +86,11 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl
 
 
 	return false
 	return false
 }
 }
+
+func (wgs *watchergroups) stop() {
+	wgs.mu.Lock()
+	defer wgs.mu.Unlock()
+	for _, wg := range wgs.groups {
+		wg.stop()
+	}
+}