Browse Source

Merge pull request #5464 from heyitsanthony/fix-victim-watchers

mvcc: tighten up watcher cancelation and revision handling
Anthony Romano 9 years ago
parent
commit
9c767cbf98
4 changed files with 97 additions and 58 deletions
  1. 21 10
      etcdserver/api/v3rpc/watch.go
  2. 68 41
      mvcc/watchable_store.go
  3. 2 2
      mvcc/watchable_store_test.go
  4. 6 5
      mvcc/watcher_group.go

+ 21 - 10
etcdserver/api/v3rpc/watch.go

@@ -94,9 +94,12 @@ type serverWatchStream struct {
 
 
 	// closec indicates the stream is closed.
 	// closec indicates the stream is closed.
 	closec chan struct{}
 	closec chan struct{}
+
+	// wg waits for the send loop to complete
+	wg sync.WaitGroup
 }
 }
 
 
-func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
+func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 	sws := serverWatchStream{
 	sws := serverWatchStream{
 		clusterID:   ws.clusterID,
 		clusterID:   ws.clusterID,
 		memberID:    ws.memberID,
 		memberID:    ws.memberID,
@@ -109,23 +112,30 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
 		closec:     make(chan struct{}),
 		closec:     make(chan struct{}),
 	}
 	}
 
 
-	go sws.sendLoop()
-	errc := make(chan error, 1)
+	sws.wg.Add(1)
 	go func() {
 	go func() {
-		errc <- sws.recvLoop()
-		sws.close()
+		sws.sendLoop()
+		sws.wg.Done()
 	}()
 	}()
+
+	errc := make(chan error, 1)
+	// Ideally recvLoop would also use sws.wg to signal its completion
+	// but when stream.Context().Done() is closed, the stream's recv
+	// may continue to block since it uses a different context, leading to
+	// deadlock when calling sws.close().
+	go func() { errc <- sws.recvLoop() }()
+
 	select {
 	select {
-	case err := <-errc:
-		return err
+	case err = <-errc:
 	case <-stream.Context().Done():
 	case <-stream.Context().Done():
-		err := stream.Context().Err()
+		err = stream.Context().Err()
 		// the only server-side cancellation is noleader for now.
 		// the only server-side cancellation is noleader for now.
 		if err == context.Canceled {
 		if err == context.Canceled {
-			return rpctypes.ErrGRPCNoLeader
+			err = rpctypes.ErrGRPCNoLeader
 		}
 		}
-		return err
 	}
 	}
+	sws.close()
+	return err
 }
 }
 
 
 func (sws *serverWatchStream) recvLoop() error {
 func (sws *serverWatchStream) recvLoop() error {
@@ -292,6 +302,7 @@ func (sws *serverWatchStream) close() {
 	sws.watchStream.Close()
 	sws.watchStream.Close()
 	close(sws.closec)
 	close(sws.closec)
 	close(sws.ctrlStream)
 	close(sws.ctrlStream)
+	sws.wg.Wait()
 }
 }
 
 
 func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
 func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {

+ 68 - 41
mvcc/watchable_store.go

@@ -190,19 +190,19 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
 	wa := &watcher{
 	wa := &watcher{
-		key: key,
-		end: end,
-		cur: startRev,
-		id:  id,
-		ch:  ch,
+		key:    key,
+		end:    end,
+		minRev: startRev,
+		id:     id,
+		ch:     ch,
 	}
 	}
 
 
 	s.store.mu.Lock()
 	s.store.mu.Lock()
 	synced := startRev > s.store.currentRev.main || startRev == 0
 	synced := startRev > s.store.currentRev.main || startRev == 0
 	if synced {
 	if synced {
-		wa.cur = s.store.currentRev.main + 1
-		if startRev > wa.cur {
-			wa.cur = startRev
+		wa.minRev = s.store.currentRev.main + 1
+		if startRev > wa.minRev {
+			wa.minRev = startRev
 		}
 		}
 	}
 	}
 	s.store.mu.Unlock()
 	s.store.mu.Unlock()
@@ -214,30 +214,47 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 	}
 	}
 	watcherGauge.Inc()
 	watcherGauge.Inc()
 
 
-	cancel := cancelFunc(func() {
+	return wa, func() { s.cancelWatcher(wa) }
+}
+
+// cancelWatcher removes references of the watcher from the watchableStore
+func (s *watchableStore) cancelWatcher(wa *watcher) {
+	for {
 		s.mu.Lock()
 		s.mu.Lock()
-		// remove references of the watcher
+
 		if s.unsynced.delete(wa) {
 		if s.unsynced.delete(wa) {
 			slowWatcherGauge.Dec()
 			slowWatcherGauge.Dec()
-			watcherGauge.Dec()
+			break
 		} else if s.synced.delete(wa) {
 		} else if s.synced.delete(wa) {
-			watcherGauge.Dec()
-		} else {
-			for _, wb := range s.victims {
-				if wb[wa] != nil {
-					slowWatcherGauge.Dec()
-					watcherGauge.Dec()
-					delete(wb, wa)
-					break
-				}
+			break
+		} else if wa.compacted {
+			break
+		}
+
+		if !wa.victim {
+			panic("watcher not victim but not in watch groups")
+		}
+
+		var victimBatch watcherBatch
+		for _, wb := range s.victims {
+			if wb[wa] != nil {
+				victimBatch = wb
+				break
 			}
 			}
 		}
 		}
-		s.mu.Unlock()
+		if victimBatch != nil {
+			slowWatcherGauge.Dec()
+			delete(victimBatch, wa)
+			break
+		}
 
 
-		// If we cannot find it, it should have finished watch.
-	})
+		// victim being processed so not accessible; retry
+		s.mu.Unlock()
+		time.Sleep(time.Millisecond)
+	}
 
 
-	return wa, cancel
+	watcherGauge.Dec()
+	s.mu.Unlock()
 }
 }
 
 
 // syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
 // syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
@@ -306,8 +323,10 @@ func (s *watchableStore) moveVictims() (moved int) {
 	for _, wb := range victims {
 	for _, wb := range victims {
 		// try to send responses again
 		// try to send responses again
 		for w, eb := range wb {
 		for w, eb := range wb {
+			// watcher has observed the store up to, but not including, w.minRev
+			rev := w.minRev - 1
 			select {
 			select {
-			case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}:
+			case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
 				pendingEventsGauge.Add(float64(len(eb.evs)))
 				pendingEventsGauge.Add(float64(len(eb.evs)))
 			default:
 			default:
 				if newVictim == nil {
 				if newVictim == nil {
@@ -328,10 +347,11 @@ func (s *watchableStore) moveVictims() (moved int) {
 				// couldn't send watch response; stays victim
 				// couldn't send watch response; stays victim
 				continue
 				continue
 			}
 			}
+			w.victim = false
 			if eb.moreRev != 0 {
 			if eb.moreRev != 0 {
-				w.cur = eb.moreRev
+				w.minRev = eb.moreRev
 			}
 			}
-			if w.cur < curRev {
+			if w.minRev <= curRev {
 				s.unsynced.add(w)
 				s.unsynced.add(w)
 			} else {
 			} else {
 				slowWatcherGauge.Dec()
 				slowWatcherGauge.Dec()
@@ -385,17 +405,20 @@ func (s *watchableStore) syncWatchers() {
 	var victims watcherBatch
 	var victims watcherBatch
 	wb := newWatcherBatch(wg, evs)
 	wb := newWatcherBatch(wg, evs)
 	for w := range wg.watchers {
 	for w := range wg.watchers {
+		w.minRev = curRev + 1
+
 		eb, ok := wb[w]
 		eb, ok := wb[w]
 		if !ok {
 		if !ok {
 			// bring un-notified watcher to synced
 			// bring un-notified watcher to synced
-			w.cur = curRev
 			s.synced.add(w)
 			s.synced.add(w)
 			s.unsynced.delete(w)
 			s.unsynced.delete(w)
 			continue
 			continue
 		}
 		}
 
 
-		w.cur = curRev
-		isBlocked := false
+		if eb.moreRev != 0 {
+			w.minRev = eb.moreRev
+		}
+
 		select {
 		select {
 		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
 		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 			pendingEventsGauge.Add(float64(len(eb.evs)))
@@ -403,14 +426,14 @@ func (s *watchableStore) syncWatchers() {
 			if victims == nil {
 			if victims == nil {
 				victims = make(watcherBatch)
 				victims = make(watcherBatch)
 			}
 			}
-			isBlocked = true
+			w.victim = true
 		}
 		}
 
 
-		if isBlocked {
+		if w.victim {
 			victims[w] = eb
 			victims[w] = eb
 		} else {
 		} else {
 			if eb.moreRev != 0 {
 			if eb.moreRev != 0 {
-				w.cur = eb.moreRev
+				// stay unsynced; more to read
 				continue
 				continue
 			}
 			}
 			s.synced.add(w)
 			s.synced.add(w)
@@ -458,14 +481,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
 			plog.Panicf("unexpected multiple revisions in notification")
 			plog.Panicf("unexpected multiple revisions in notification")
 		}
 		}
 		select {
 		select {
-		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
+		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 		default:
 		default:
 			// move slow watcher to victims
 			// move slow watcher to victims
-			w.cur = rev
+			w.minRev = rev + 1
 			if victim == nil {
 			if victim == nil {
 				victim = make(watcherBatch)
 				victim = make(watcherBatch)
 			}
 			}
+			w.victim = true
 			victim[w] = eb
 			victim[w] = eb
 			s.synced.delete(w)
 			s.synced.delete(w)
 			slowWatcherGauge.Inc()
 			slowWatcherGauge.Inc()
@@ -508,12 +532,15 @@ type watcher struct {
 	// If end is set, the watcher is on a range.
 	// If end is set, the watcher is on a range.
 	end []byte
 	end []byte
 
 
-	// cur is the current watcher revision of a unsynced watcher.
-	// cur will be updated for unsynced watcher while it is catching up.
-	// cur is startRev of a synced watcher.
-	// cur will not be updated for synced watcher.
-	cur int64
-	id  WatchID
+	// victim is set when ch is blocked and undergoing victim processing
+	victim bool
+
+	// compacted is set when the watcher is removed because of compaction
+	compacted bool
+
+	// minRev is the minimum revision update the watcher will accept
+	minRev int64
+	id     WatchID
 
 
 	// a chan to send out the watch response.
 	// a chan to send out the watch response.
 	// The chan might be shared with other watchers.
 	// The chan might be shared with other watchers.

+ 2 - 2
mvcc/watchable_store_test.go

@@ -193,8 +193,8 @@ func TestSyncWatchers(t *testing.T) {
 	}
 	}
 
 
 	for w := range sws {
 	for w := range sws {
-		if w.cur != s.Rev() {
-			t.Errorf("w.cur = %d, want %d", w.cur, s.Rev())
+		if w.minRev != s.Rev()+1 {
+			t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
 		}
 		}
 	}
 	}
 
 

+ 6 - 5
mvcc/watcher_group.go

@@ -81,7 +81,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
 	wb := make(watcherBatch)
 	wb := make(watcherBatch)
 	for _, ev := range evs {
 	for _, ev := range evs {
 		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
 		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
-			if ev.Kv.ModRevision >= w.cur {
+			if ev.Kv.ModRevision >= w.minRev {
 				// don't double notify
 				// don't double notify
 				wb.add(w, ev)
 				wb.add(w, ev)
 			}
 			}
@@ -233,20 +233,21 @@ func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watc
 func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
 func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
 	minRev := int64(math.MaxInt64)
 	minRev := int64(math.MaxInt64)
 	for w := range wg.watchers {
 	for w := range wg.watchers {
-		if w.cur > curRev {
+		if w.minRev > curRev {
 			panic("watcher current revision should not exceed current revision")
 			panic("watcher current revision should not exceed current revision")
 		}
 		}
-		if w.cur < compactRev {
+		if w.minRev < compactRev {
 			select {
 			select {
 			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
 			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+				w.compacted = true
 				wg.delete(w)
 				wg.delete(w)
 			default:
 			default:
 				// retry next time
 				// retry next time
 			}
 			}
 			continue
 			continue
 		}
 		}
-		if minRev > w.cur {
-			minRev = w.cur
+		if minRev > w.minRev {
+			minRev = w.minRev
 		}
 		}
 	}
 	}
 	return minRev
 	return minRev