Browse Source

grpcproxy: fix race on watcher revision

Was racing between broadcast setting the watchgroup revision
and joining single watchers.
Anthony Romano 9 years ago
parent
commit
bd1985d84b
2 changed files with 6 additions and 6 deletions
  1. 5 1
      proxy/grpcproxy/watcher_group.go
  2. 1 5
      proxy/grpcproxy/watcher_groups.go

+ 5 - 1
proxy/grpcproxy/watcher_group.go

@@ -68,10 +68,14 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) {
 }
 
 // add adds the watcher into the group with given ID.
-// The current revision of the watcherGroup is returned.
+// The current revision of the watcherGroup is returned or -1
+// if the watcher is at a revision prior to the watcher group.
 func (wg *watcherGroup) add(rid receiverID, w watcher) int64 {
 	wg.mu.Lock()
 	defer wg.mu.Unlock()
+	if wg.rev > w.rev {
+		return -1
+	}
 	wg.receivers[rid] = w
 	return wg.rev
 }

+ 1 - 5
proxy/grpcproxy/watcher_groups.go

@@ -102,11 +102,7 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl
 
 	group, ok := wgs.groups[ws.w.wr]
 	if ok {
-		if ws.w.rev >= group.rev {
-			group.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w)
-			return true
-		}
-		return false
+		return group.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w) != -1
 	}
 
 	if ws.canPromote() {