Browse Source

grpcproxy: stop watchers in watch groups

Xiang Li 9 years ago
parent
commit
eded62e60c
2 changed files with 30 additions and 15 deletions
  1. 29 11
      proxy/grpcproxy/watch.go
  2. 1 4
      proxy/grpcproxy/watcher_single.go

+ 29 - 11
proxy/grpcproxy/watch.go

@@ -59,9 +59,10 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
 	wp.mu.Unlock()
 
 	sws := serverWatchStream{
-		cw:      wp.cw,
-		groups:  &wp.wgs,
-		singles: make(map[int64]*watcherSingle),
+		cw:       wp.cw,
+		groups:   &wp.wgs,
+		singles:  make(map[int64]*watcherSingle),
+		inGroups: make(map[int64]struct{}),
 
 		id:         wp.nextStreamID,
 		gRPCStream: stream,
@@ -80,9 +81,10 @@ type serverWatchStream struct {
 	id int64
 	cw clientv3.Watcher
 
-	mu      sync.Mutex // make sure any access of groups and singles is atomic
-	groups  *watchergroups
-	singles map[int64]*watcherSingle
+	mu       sync.Mutex // make sure any access of groups and singles is atomic
+	groups   *watchergroups
+	singles  map[int64]*watcherSingle
+	inGroups map[int64]struct{}
 
 	gRPCStream pb.Watch_WatchServer
 
@@ -94,22 +96,31 @@ type serverWatchStream struct {
 }
 
 func (sws *serverWatchStream) close() {
-	close(sws.watchCh)
-
 	var wg sync.WaitGroup
 	sws.mu.Lock()
-	wg.Add(len(sws.singles))
+	wg.Add(len(sws.singles) + len(sws.inGroups))
 	for _, ws := range sws.singles {
-		ws.stop()
 		// copy the range variable to avoid race
 		copyws := ws
 		go func() {
-			<-copyws.stopNotify()
+			copyws.stop()
+			wg.Done()
+		}()
+	}
+	for id := range sws.inGroups {
+		// copy the range variable to avoid race
+		wid := id
+		go func() {
+			sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
 			wg.Done()
 		}()
 	}
+	sws.inGroups = nil
 	sws.mu.Unlock()
+
 	wg.Wait()
+
+	close(sws.watchCh)
 }
 
 func (sws *serverWatchStream) recvLoop() error {
@@ -176,6 +187,7 @@ func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
 
 	rid := receiverID{streamID: sws.id, watcherID: w.id}
 	sws.groups.addWatcher(rid, w)
+	sws.inGroups[w.id] = struct{}{}
 }
 
 func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
@@ -201,8 +213,13 @@ func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
 	defer sws.mu.Unlock()
 
 	rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
+	// do not add new watchers when stream is closing
+	if sws.inGroups == nil {
+		return false
+	}
 	if sws.groups.maybeJoinWatcherSingle(rid, ws) {
 		delete(sws.singles, ws.w.id)
+		sws.inGroups[ws.w.id] = struct{}{}
 		return true
 	}
 	return false
@@ -236,6 +253,7 @@ func (sws *serverWatchStream) removeWatcher(id int64) {
 
 	rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
 	if ok {
+		delete(sws.inGroups, id)
 		return
 	}
 

+ 1 - 4
proxy/grpcproxy/watcher_single.go

@@ -70,8 +70,5 @@ func (ws watcherSingle) canPromote() bool {
 
 func (ws watcherSingle) stop() {
 	ws.cancel()
-}
-
-func (ws watcherSingle) stopNotify() <-chan struct{} {
-	return ws.donec
+	<-ws.donec
 }