Browse Source

v3rpc: don't race on current watcher header revision

Anthony Romano 9 years ago
parent
commit
af225e7433
2 changed files with 19 additions and 3 deletions
  1. 9 2
      etcdserver/api/v3rpc/watch.go
  2. 10 1
      storage/watchable_store.go

+ 9 - 2
etcdserver/api/v3rpc/watch.go

@@ -102,9 +102,16 @@ func (sws *serverWatchStream) recvLoop() error {
 					toWatch = creq.Prefix
 					prefix = true
 				}
-				id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
+
+				rev := creq.StartRevision
+				wsrev := sws.watchStream.Rev()
+				if rev == 0 {
+					// rev 0 watches past the current revision
+					rev = wsrev + 1
+				}
+				id := sws.watchStream.Watch(toWatch, prefix, rev)
 				sws.ctrlStream <- &pb.WatchResponse{
-					Header:  sws.newResponseHeader(sws.watchStream.Rev()),
+					Header:  sws.newResponseHeader(wsrev),
 					WatchId: int64(id),
 					Created: true,
 				}

+ 10 - 1
storage/watchable_store.go

@@ -227,7 +227,16 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
 		ch:     ch,
 	}
 
-	if startRev == 0 {
+	s.store.mu.Lock()
+	synced := startRev > s.store.currentRev.main || startRev == 0
+	if synced {
+		wa.cur = s.store.currentRev.main + 1
+	}
+	s.store.mu.Unlock()
+	if synced {
+		if startRev > wa.cur {
+			panic("can't watch past sync revision")
+		}
 		s.synced.add(wa)
 	} else {
 		slowWatcherGauge.Inc()