Browse Source

mvcc: tighten up watcher cancelation and revision handling

Makes w.cur into w.minrev, the minimum revision for the next update, and
retries cancelation if the watcher isn't found (because it's being processed
by moveVictims).

Fixes: #5459
Anthony Romano 9 years ago
parent
commit
cfb3f96c2b
3 changed files with 76 additions and 48 deletions
  1. 68 41
      mvcc/watchable_store.go
  2. 2 2
      mvcc/watchable_store_test.go
  3. 6 5
      mvcc/watcher_group.go

+ 68 - 41
mvcc/watchable_store.go

@@ -190,19 +190,19 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 	defer s.mu.Unlock()
 
 	wa := &watcher{
-		key: key,
-		end: end,
-		cur: startRev,
-		id:  id,
-		ch:  ch,
+		key:    key,
+		end:    end,
+		minRev: startRev,
+		id:     id,
+		ch:     ch,
 	}
 
 	s.store.mu.Lock()
 	synced := startRev > s.store.currentRev.main || startRev == 0
 	if synced {
-		wa.cur = s.store.currentRev.main + 1
-		if startRev > wa.cur {
-			wa.cur = startRev
+		wa.minRev = s.store.currentRev.main + 1
+		if startRev > wa.minRev {
+			wa.minRev = startRev
 		}
 	}
 	s.store.mu.Unlock()
@@ -214,30 +214,47 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 	}
 	watcherGauge.Inc()
 
-	cancel := cancelFunc(func() {
+	return wa, func() { s.cancelWatcher(wa) }
+}
+
+// cancelWatcher removes references of the watcher from the watchableStore
+func (s *watchableStore) cancelWatcher(wa *watcher) {
+	for {
 		s.mu.Lock()
-		// remove references of the watcher
+
 		if s.unsynced.delete(wa) {
 			slowWatcherGauge.Dec()
-			watcherGauge.Dec()
+			break
 		} else if s.synced.delete(wa) {
-			watcherGauge.Dec()
-		} else {
-			for _, wb := range s.victims {
-				if wb[wa] != nil {
-					slowWatcherGauge.Dec()
-					watcherGauge.Dec()
-					delete(wb, wa)
-					break
-				}
+			break
+		} else if wa.compacted {
+			break
+		}
+
+		if !wa.victim {
+			panic("watcher not victim but not in watch groups")
+		}
+
+		var victimBatch watcherBatch
+		for _, wb := range s.victims {
+			if wb[wa] != nil {
+				victimBatch = wb
+				break
 			}
 		}
-		s.mu.Unlock()
+		if victimBatch != nil {
+			slowWatcherGauge.Dec()
+			delete(victimBatch, wa)
+			break
+		}
 
-		// If we cannot find it, it should have finished watch.
-	})
+		// victim being processed so not accessible; retry
+		s.mu.Unlock()
+		time.Sleep(time.Millisecond)
+	}
 
-	return wa, cancel
+	watcherGauge.Dec()
+	s.mu.Unlock()
 }
 
 // syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
@@ -306,8 +323,10 @@ func (s *watchableStore) moveVictims() (moved int) {
 	for _, wb := range victims {
 		// try to send responses again
 		for w, eb := range wb {
+			// watcher has observed the store up to, but not including, w.minRev
+			rev := w.minRev - 1
 			select {
-			case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}:
+			case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
 				pendingEventsGauge.Add(float64(len(eb.evs)))
 			default:
 				if newVictim == nil {
@@ -328,10 +347,11 @@ func (s *watchableStore) moveVictims() (moved int) {
 				// couldn't send watch response; stays victim
 				continue
 			}
+			w.victim = false
 			if eb.moreRev != 0 {
-				w.cur = eb.moreRev
+				w.minRev = eb.moreRev
 			}
-			if w.cur < curRev {
+			if w.minRev <= curRev {
 				s.unsynced.add(w)
 			} else {
 				slowWatcherGauge.Dec()
@@ -385,17 +405,20 @@ func (s *watchableStore) syncWatchers() {
 	var victims watcherBatch
 	wb := newWatcherBatch(wg, evs)
 	for w := range wg.watchers {
+		w.minRev = curRev + 1
+
 		eb, ok := wb[w]
 		if !ok {
 			// bring un-notified watcher to synced
-			w.cur = curRev
 			s.synced.add(w)
 			s.unsynced.delete(w)
 			continue
 		}
 
-		w.cur = curRev
-		isBlocked := false
+		if eb.moreRev != 0 {
+			w.minRev = eb.moreRev
+		}
+
 		select {
 		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
 			pendingEventsGauge.Add(float64(len(eb.evs)))
@@ -403,14 +426,14 @@ func (s *watchableStore) syncWatchers() {
 			if victims == nil {
 				victims = make(watcherBatch)
 			}
-			isBlocked = true
+			w.victim = true
 		}
 
-		if isBlocked {
+		if w.victim {
 			victims[w] = eb
 		} else {
 			if eb.moreRev != 0 {
-				w.cur = eb.moreRev
+				// stay unsynced; more to read
 				continue
 			}
 			s.synced.add(w)
@@ -458,14 +481,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
 			plog.Panicf("unexpected multiple revisions in notification")
 		}
 		select {
-		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
+		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 		default:
 			// move slow watcher to victims
-			w.cur = rev
+			w.minRev = rev + 1
 			if victim == nil {
 				victim = make(watcherBatch)
 			}
+			w.victim = true
 			victim[w] = eb
 			s.synced.delete(w)
 			slowWatcherGauge.Inc()
@@ -508,12 +532,15 @@ type watcher struct {
 	// If end is set, the watcher is on a range.
 	end []byte
 
-	// cur is the current watcher revision of a unsynced watcher.
-	// cur will be updated for unsynced watcher while it is catching up.
-	// cur is startRev of a synced watcher.
-	// cur will not be updated for synced watcher.
-	cur int64
-	id  WatchID
+	// victim is set when ch is blocked and undergoing victim processing
+	victim bool
+
+	// compacted is set when the watcher is removed because of compaction
+	compacted bool
+
+	// minRev is the minimum revision update the watcher will accept
+	minRev int64
+	id     WatchID
 
 	// a chan to send out the watch response.
 	// The chan might be shared with other watchers.

+ 2 - 2
mvcc/watchable_store_test.go

@@ -193,8 +193,8 @@ func TestSyncWatchers(t *testing.T) {
 	}
 
 	for w := range sws {
-		if w.cur != s.Rev() {
-			t.Errorf("w.cur = %d, want %d", w.cur, s.Rev())
+		if w.minRev != s.Rev()+1 {
+			t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
 		}
 	}
 

+ 6 - 5
mvcc/watcher_group.go

@@ -81,7 +81,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
 	wb := make(watcherBatch)
 	for _, ev := range evs {
 		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
-			if ev.Kv.ModRevision >= w.cur {
+			if ev.Kv.ModRevision >= w.minRev {
 				// don't double notify
 				wb.add(w, ev)
 			}
@@ -233,20 +233,21 @@ func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watc
 func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
 	minRev := int64(math.MaxInt64)
 	for w := range wg.watchers {
-		if w.cur > curRev {
+		if w.minRev > curRev {
 			panic("watcher current revision should not exceed current revision")
 		}
-		if w.cur < compactRev {
+		if w.minRev < compactRev {
 			select {
 			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+				w.compacted = true
 				wg.delete(w)
 			default:
 				// retry next time
 			}
 			continue
 		}
-		if minRev > w.cur {
-			minRev = w.cur
+		if minRev > w.minRev {
+			minRev = w.minRev
 		}
 	}
 	return minRev