Browse Source

Merge pull request #6337 from xiang90/watch_cancel

grpcproxy: support cancel watcher
Xiang Li 9 years ago
parent
commit
f7293125cf
3 changed files with 43 additions and 20 deletions
  1. 31 16
      proxy/grpcproxy/watch.go
  2. 6 1
      proxy/grpcproxy/watcher_group.go
  3. 6 3
      proxy/grpcproxy/watcher_groups.go

+ 31 - 16
proxy/grpcproxy/watch.go

@@ -39,9 +39,10 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 	wp := &watchProxy{
 		cw: c.Watcher,
 		wgs: watchergroups{
-			cw:       c.Watcher,
-			groups:   make(map[watchRange]*watcherGroup),
-			proxyCtx: c.Ctx(),
+			cw:        c.Watcher,
+			groups:    make(map[watchRange]*watcherGroup),
+			idToGroup: make(map[receiverID]*watcherGroup),
+			proxyCtx:  c.Ctx(),
 		},
 		ctx: c.Ctx(),
 	}
@@ -65,7 +66,6 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 		id:         wp.nextStreamID,
 		gRPCStream: stream,
 
-		ctrlCh:  make(chan *pb.WatchResponse, 10),
 		watchCh: make(chan *pb.WatchResponse, 10),
 
 		proxyCtx: wp.ctx,
@@ -86,7 +86,6 @@ type serverWatchStream struct {
 
 	gRPCStream pb.Watch_WatchServer
 
-	ctrlCh  chan *pb.WatchResponse
 	watchCh chan *pb.WatchResponse
 
 	nextWatcherID int64
@@ -96,7 +95,6 @@ type serverWatchStream struct {
 
 func (sws *serverWatchStream) close() {
 	close(sws.watchCh)
-	close(sws.ctrlCh)
 
 	var wg sync.WaitGroup
 	sws.mu.Lock()
@@ -166,14 +164,6 @@ func (sws *serverWatchStream) sendLoop() {
 			if err := sws.gRPCStream.Send(wresp); err != nil {
 				return
 			}
-
-		case c, ok := <-sws.ctrlCh:
-			if !ok {
-				return
-			}
-			if err := sws.gRPCStream.Send(c); err != nil {
-				return
-			}
 		case <-sws.proxyCtx.Done():
 			return
 		}
@@ -222,12 +212,37 @@ func (sws *serverWatchStream) removeWatcher(id int64) {
 	sws.mu.Lock()
 	defer sws.mu.Unlock()
 
-	if sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) {
+	var (
+		rev int64
+		ok  bool
+	)
+
+	defer func() {
+		if !ok {
+			return
+		}
+		resp := &pb.WatchResponse{
+			Header: &pb.ResponseHeader{
+				// todo: fill in ClusterId
+				// todo: fill in MemberId:
+				Revision: rev,
+				// todo: fill in RaftTerm:
+			},
+			WatchId:  id,
+			Canceled: true,
+		}
+		sws.watchCh <- resp
+	}()
+
+	rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
+	if ok {
 		return
 	}
 
-	if ws, ok := sws.singles[id]; ok {
+	var ws *watcherSingle
+	if ws, ok = sws.singles[id]; ok {
 		delete(sws.singles, id)
 		ws.stop()
+		rev = ws.lastStoreRev
 	}
 }

+ 6 - 1
proxy/grpcproxy/watcher_group.go

@@ -70,7 +70,6 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
 func (wg *watcherGroup) add(rid receiverID, w watcher) {
 	wg.mu.Lock()
 	defer wg.mu.Unlock()
-
 	wg.receivers[rid] = w
 }
 
@@ -92,3 +91,9 @@ func (wg *watcherGroup) stop() {
 	wg.cancel()
 	<-wg.donec
 }
+
+func (wg *watcherGroup) revision() int64 {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+	return wg.rev
+}

+ 6 - 3
proxy/grpcproxy/watcher_groups.go

@@ -39,6 +39,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 
 	if wg, ok := groups[w.wr]; ok {
 		wg.add(rid, w)
+		wgs.idToGroup[rid] = wg
 		return
 	}
 
@@ -54,20 +55,22 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 	watchg.add(rid, w)
 	go watchg.run()
 	groups[w.wr] = watchg
+	wgs.idToGroup[rid] = watchg
 }
 
-func (wgs *watchergroups) removeWatcher(rid receiverID) bool {
+func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) {
 	wgs.mu.Lock()
 	defer wgs.mu.Unlock()
 
 	if g, ok := wgs.idToGroup[rid]; ok {
 		g.delete(rid)
+		delete(wgs.idToGroup, rid)
 		if g.isEmpty() {
 			g.stop()
 		}
-		return true
+		return g.revision(), true
 	}
-	return false
+	return -1, false
 }
 
 func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {