Browse Source

grpcproxy: support cancel watcher

We do not wait for the cancellation from actual etcd server,
but generate it at the proxy side. The rule is to return the
latest rev that the watcher has seen. This should be good
enough for most use cases if not all.
Xiang Li 9 years ago
parent
commit
51b4d6b7a8
3 changed files with 43 additions and 20 deletions
  1. 31 16
      proxy/grpcproxy/watch.go
  2. 6 1
      proxy/grpcproxy/watcher_group.go
  3. 6 3
      proxy/grpcproxy/watcher_groups.go

+ 31 - 16
proxy/grpcproxy/watch.go

@@ -39,9 +39,10 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 	wp := &watchProxy{
 	wp := &watchProxy{
 		cw: c.Watcher,
 		cw: c.Watcher,
 		wgs: watchergroups{
 		wgs: watchergroups{
-			cw:       c.Watcher,
-			groups:   make(map[watchRange]*watcherGroup),
-			proxyCtx: c.Ctx(),
+			cw:        c.Watcher,
+			groups:    make(map[watchRange]*watcherGroup),
+			idToGroup: make(map[receiverID]*watcherGroup),
+			proxyCtx:  c.Ctx(),
 		},
 		},
 		ctx: c.Ctx(),
 		ctx: c.Ctx(),
 	}
 	}
@@ -65,7 +66,6 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 		id:         wp.nextStreamID,
 		id:         wp.nextStreamID,
 		gRPCStream: stream,
 		gRPCStream: stream,
 
 
-		ctrlCh:  make(chan *pb.WatchResponse, 10),
 		watchCh: make(chan *pb.WatchResponse, 10),
 		watchCh: make(chan *pb.WatchResponse, 10),
 
 
 		proxyCtx: wp.ctx,
 		proxyCtx: wp.ctx,
@@ -86,7 +86,6 @@ type serverWatchStream struct {
 
 
 	gRPCStream pb.Watch_WatchServer
 	gRPCStream pb.Watch_WatchServer
 
 
-	ctrlCh  chan *pb.WatchResponse
 	watchCh chan *pb.WatchResponse
 	watchCh chan *pb.WatchResponse
 
 
 	nextWatcherID int64
 	nextWatcherID int64
@@ -96,7 +95,6 @@ type serverWatchStream struct {
 
 
 func (sws *serverWatchStream) close() {
 func (sws *serverWatchStream) close() {
 	close(sws.watchCh)
 	close(sws.watchCh)
-	close(sws.ctrlCh)
 
 
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	sws.mu.Lock()
 	sws.mu.Lock()
@@ -166,14 +164,6 @@ func (sws *serverWatchStream) sendLoop() {
 			if err := sws.gRPCStream.Send(wresp); err != nil {
 			if err := sws.gRPCStream.Send(wresp); err != nil {
 				return
 				return
 			}
 			}
-
-		case c, ok := <-sws.ctrlCh:
-			if !ok {
-				return
-			}
-			if err := sws.gRPCStream.Send(c); err != nil {
-				return
-			}
 		case <-sws.proxyCtx.Done():
 		case <-sws.proxyCtx.Done():
 			return
 			return
 		}
 		}
@@ -222,12 +212,37 @@ func (sws *serverWatchStream) removeWatcher(id int64) {
 	sws.mu.Lock()
 	sws.mu.Lock()
 	defer sws.mu.Unlock()
 	defer sws.mu.Unlock()
 
 
-	if sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) {
+	var (
+		rev int64
+		ok  bool
+	)
+
+	defer func() {
+		if !ok {
+			return
+		}
+		resp := &pb.WatchResponse{
+			Header: &pb.ResponseHeader{
+				// todo: fill in ClusterId
+				// todo: fill in MemberId:
+				Revision: rev,
+				// todo: fill in RaftTerm:
+			},
+			WatchId:  id,
+			Canceled: true,
+		}
+		sws.watchCh <- resp
+	}()
+
+	rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
+	if ok {
 		return
 		return
 	}
 	}
 
 
-	if ws, ok := sws.singles[id]; ok {
+	var ws *watcherSingle
+	if ws, ok = sws.singles[id]; ok {
 		delete(sws.singles, id)
 		delete(sws.singles, id)
 		ws.stop()
 		ws.stop()
+		rev = ws.lastStoreRev
 	}
 	}
 }
 }

+ 6 - 1
proxy/grpcproxy/watcher_group.go

@@ -70,7 +70,6 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
 func (wg *watcherGroup) add(rid receiverID, w watcher) {
 func (wg *watcherGroup) add(rid receiverID, w watcher) {
 	wg.mu.Lock()
 	wg.mu.Lock()
 	defer wg.mu.Unlock()
 	defer wg.mu.Unlock()
-
 	wg.receivers[rid] = w
 	wg.receivers[rid] = w
 }
 }
 
 
@@ -92,3 +91,9 @@ func (wg *watcherGroup) stop() {
 	wg.cancel()
 	wg.cancel()
 	<-wg.donec
 	<-wg.donec
 }
 }
+
+func (wg *watcherGroup) revision() int64 {
+	wg.mu.Lock()
+	defer wg.mu.Unlock()
+	return wg.rev
+}

+ 6 - 3
proxy/grpcproxy/watcher_groups.go

@@ -39,6 +39,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 
 
 	if wg, ok := groups[w.wr]; ok {
 	if wg, ok := groups[w.wr]; ok {
 		wg.add(rid, w)
 		wg.add(rid, w)
+		wgs.idToGroup[rid] = wg
 		return
 		return
 	}
 	}
 
 
@@ -54,20 +55,22 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
 	watchg.add(rid, w)
 	watchg.add(rid, w)
 	go watchg.run()
 	go watchg.run()
 	groups[w.wr] = watchg
 	groups[w.wr] = watchg
+	wgs.idToGroup[rid] = watchg
 }
 }
 
 
-func (wgs *watchergroups) removeWatcher(rid receiverID) bool {
+func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) {
 	wgs.mu.Lock()
 	wgs.mu.Lock()
 	defer wgs.mu.Unlock()
 	defer wgs.mu.Unlock()
 
 
 	if g, ok := wgs.idToGroup[rid]; ok {
 	if g, ok := wgs.idToGroup[rid]; ok {
 		g.delete(rid)
 		g.delete(rid)
+		delete(wgs.idToGroup, rid)
 		if g.isEmpty() {
 		if g.isEmpty() {
 			g.stop()
 			g.stop()
 		}
 		}
-		return true
+		return g.revision(), true
 	}
 	}
-	return false
+	return -1, false
 }
 }
 
 
 func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {
 func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {