Browse Source

grpcproxy: bind clientv3.Watcher on initialization

Anthony Romano 9 years ago
parent
commit
418bb5e176
2 changed files with 8 additions and 8 deletions
  1. 6 6
      proxy/grpcproxy/watch.go
  2. 2 2
      proxy/grpcproxy/watcher_groups.go

+ 6 - 6
proxy/grpcproxy/watch.go

@@ -26,7 +26,7 @@ import (
 )
 
 type watchProxy struct {
-	c   *clientv3.Client
+	cw  clientv3.Watcher
 	wgs watchergroups
 
 	mu           sync.Mutex
@@ -35,9 +35,9 @@ type watchProxy struct {
 
 func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 	return &watchProxy{
-		c: c,
+		cw: c.Watcher,
 		wgs: watchergroups{
-			c:      c,
+			cw:     c.Watcher,
 			groups: make(map[watchRange]*watcherGroup),
 		},
 	}
@@ -49,7 +49,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 	wp.mu.Unlock()
 
 	sws := serverWatchStream{
-		c:      wp.c,
+		cw:     wp.cw,
 		groups: &wp.wgs,
 
 		id:         wp.nextStreamID,
@@ -68,7 +68,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 
 type serverWatchStream struct {
 	id int64
-	c  *clientv3.Client
+	cw clientv3.Watcher
 
 	mu      sync.Mutex // make sure any access of groups and singles is atomic
 	groups  *watchergroups
@@ -170,7 +170,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
 
 	ctx, cancel := context.WithCancel(context.Background())
 
-	wch := sws.c.Watch(ctx,
+	wch := sws.cw.Watch(ctx,
 		w.wr.key, clientv3.WithRange(w.wr.end),
 		clientv3.WithRev(rev),
 		clientv3.WithProgressNotify(),

+ 2 - 2
proxy/grpcproxy/watcher_groups.go

@@ -22,7 +22,7 @@ import (
 )
 
 type watchergroups struct {
-	c *clientv3.Client
+	cw clientv3.Watcher
 
 	mu        sync.Mutex
 	groups    map[watchRange]*watcherGroup
@@ -42,7 +42,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 
 	ctx, cancel := context.WithCancel(context.Background())
 
-	wch := wgs.c.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
+	wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify())
 	watchg := newWatchergroup(wch, cancel)
 	watchg.add(rid, w)
 	go watchg.run()