Browse Source

mvcc: only remove watch cancel after cancel completes

If Close() is called before Cancel()'s cancel() completes, the
watch channel will be closed while the watch is still in the
synced list. If there's an event, etcd will try to write to a
closed channel. Instead, remove the watch from the bookkeeping
structures only after cancel completes, so Close() will always
call it.

Fixes #8443
Anthony Romano 8 years ago
parent
commit
896447ed99
2 changed files with 17 additions and 6 deletions
  1. 4 2
      mvcc/watchable_store.go
  2. 13 4
      mvcc/watcher.go

+ 4 - 2
mvcc/watchable_store.go

@@ -144,7 +144,6 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 func (s *watchableStore) cancelWatcher(wa *watcher) {
 func (s *watchableStore) cancelWatcher(wa *watcher) {
 	for {
 	for {
 		s.mu.Lock()
 		s.mu.Lock()
-
 		if s.unsynced.delete(wa) {
 		if s.unsynced.delete(wa) {
 			slowWatcherGauge.Dec()
 			slowWatcherGauge.Dec()
 			break
 			break
@@ -152,6 +151,9 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
 			break
 			break
 		} else if wa.compacted {
 		} else if wa.compacted {
 			break
 			break
+		} else if wa.ch == nil {
+			// already canceled (e.g., cancel/close race)
+			break
 		}
 		}
 
 
 		if !wa.victim {
 		if !wa.victim {
@@ -177,6 +179,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
 	}
 	}
 
 
 	watcherGauge.Dec()
 	watcherGauge.Dec()
+	wa.ch = nil
 	s.mu.Unlock()
 	s.mu.Unlock()
 }
 }
 
 
@@ -425,7 +428,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
 		if eb.revs != 1 {
 		if eb.revs != 1 {
 			plog.Panicf("unexpected multiple revisions in notification")
 			plog.Panicf("unexpected multiple revisions in notification")
 		}
 		}
-
 		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
 		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 		} else {
 		} else {

+ 13 - 4
mvcc/watcher.go

@@ -129,16 +129,25 @@ func (ws *watchStream) Chan() <-chan WatchResponse {
 func (ws *watchStream) Cancel(id WatchID) error {
 func (ws *watchStream) Cancel(id WatchID) error {
 	ws.mu.Lock()
 	ws.mu.Lock()
 	cancel, ok := ws.cancels[id]
 	cancel, ok := ws.cancels[id]
+	w := ws.watchers[id]
 	ok = ok && !ws.closed
 	ok = ok && !ws.closed
-	if ok {
-		delete(ws.cancels, id)
-		delete(ws.watchers, id)
-	}
 	ws.mu.Unlock()
 	ws.mu.Unlock()
+
 	if !ok {
 	if !ok {
 		return ErrWatcherNotExist
 		return ErrWatcherNotExist
 	}
 	}
 	cancel()
 	cancel()
+
+	ws.mu.Lock()
+	// The watch isn't removed until cancel so that if Close() is called,
+	// it will wait for the cancel. Otherwise, Close() could close the
+	// watch channel while the store is still posting events.
+	if ww := ws.watchers[id]; ww == w {
+		delete(ws.cancels, id)
+		delete(ws.watchers, id)
+	}
+	ws.mu.Unlock()
+
 	return nil
 	return nil
 }
 }