Преглед на файлове

mvcc: limit total watchers synced per sync

Fixes #4567
Anthony Romano преди 9 години
родител
ревизия
080272be17
променени са 2 файла, в които са добавени 53 реда и са изтрити 30 реда
  1. 36 24
      mvcc/watchable_store.go
  2. 17 6
      mvcc/watcher_group.go

+ 36 - 24
mvcc/watchable_store.go

@@ -30,6 +30,9 @@ const (
 	// TODO: find a good buf value. 1024 is just a random one that
 	// seems to be reasonable.
 	chanBufLen = 1024
+
+	// maxWatchersPerSync is the number of watchers to sync in a single batch
+	maxWatchersPerSync = 512
 )
 
 type watchable interface {
@@ -231,36 +234,47 @@ func (s *watchableStore) syncWatchersLoop() {
 
 	for {
 		s.mu.Lock()
+		st := time.Now()
+		lastUnsyncedWatchers := s.unsynced.size()
 		s.syncWatchers()
+		unsyncedWatchers := s.unsynced.size()
 		s.mu.Unlock()
+		syncDuration := time.Since(st)
+
+		waitDuration := 100 * time.Millisecond
+		// more work pending?
+		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
+			// be fair to other store operations by yielding time taken
+			waitDuration = syncDuration
+		}
 
 		select {
-		case <-time.After(100 * time.Millisecond):
+		case <-time.After(waitDuration):
 		case <-s.stopc:
 			return
 		}
 	}
 }
 
-// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced
-// watchers to get the minimum revision within its range, skipping the
-// watcher if its current revision is behind the compact revision of the
-// store. And use this minimum revision to get all key-value pairs. Then send
-// those events to watchers.
+// syncWatchers syncs unsynced watchers by:
+//	1. choose a set of watchers from the unsynced watcher group
+//	2. iterate over the set to get the minimum revision and remove compacted watchers
+//	3. use minimum revision to get all key-value pairs and send those events to watchers
+//	4. remove synced watchers in set from unsynced group and move to synced group
 func (s *watchableStore) syncWatchers() {
-	s.store.mu.Lock()
-	defer s.store.mu.Unlock()
-
 	if s.unsynced.size() == 0 {
 		return
 	}
 
+	s.store.mu.Lock()
+	defer s.store.mu.Unlock()
+
 	// in order to find key-value pairs from unsynced watchers, we need to
 	// find min revision index, and these revisions can be used to
 	// query the backend store of key-value pairs
 	curRev := s.store.currentRev.main
 	compactionRev := s.store.compactMainRev
-	minRev := s.unsynced.scanMinRev(curRev, compactionRev)
+	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
 	minBytes, maxBytes := newRevBytes(), newRevBytes()
 	revToBytes(revision{main: minRev}, minBytes)
 	revToBytes(revision{main: curRev + 1}, maxBytes)
@@ -270,15 +284,22 @@ func (s *watchableStore) syncWatchers() {
 	tx := s.store.b.BatchTx()
 	tx.Lock()
 	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
-	evs := kvsToEvents(&s.unsynced, revs, vs)
+	evs := kvsToEvents(wg, revs, vs)
 	tx.Unlock()
 
-	wb := newWatcherBatch(&s.unsynced, evs)
+	wb := newWatcherBatch(wg, evs)
+	for w := range wg.watchers {
+		eb, ok := wb[w]
+		if !ok {
+			// bring un-notified watcher to synced
+			w.cur = curRev
+			s.synced.add(w)
+			s.unsynced.delete(w)
+			continue
+		}
 
-	for w, eb := range wb {
 		select {
-		// s.store.Rev also uses Lock, so just return directly
-		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}:
+		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
 			pendingEventsGauge.Add(float64(len(eb.evs)))
 		default:
 			// TODO: handle the full unsynced watchers.
@@ -295,15 +316,6 @@ func (s *watchableStore) syncWatchers() {
 		s.unsynced.delete(w)
 	}
 
-	// bring all un-notified watchers to synced.
-	for w := range s.unsynced.watchers {
-		if !wb.contains(w) {
-			w.cur = curRev
-			s.synced.add(w)
-			s.unsynced.delete(w)
-		}
-	}
-
 	slowWatcherGauge.Set(float64(s.unsynced.size()))
 }
 

+ 17 - 6
mvcc/watcher_group.go

@@ -75,11 +75,6 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
 	eb.add(ev)
 }
 
-func (wb watcherBatch) contains(w *watcher) bool {
-	_, ok := wb[w]
-	return ok
-}
-
 // newWatcherBatch maps watchers to their matched events. It enables quick
 // events look up by watcher.
 func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
@@ -219,7 +214,23 @@ func (wg *watcherGroup) delete(wa *watcher) bool {
 	return true
 }
 
-func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 {
+// choose selects watchers from the watcher group to update
+func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
+	if len(wg.watchers) < maxWatchers {
+		return wg, wg.chooseAll(curRev, compactRev)
+	}
+	ret := newWatcherGroup()
+	for w := range wg.watchers {
+		if maxWatchers <= 0 {
+			break
+		}
+		maxWatchers--
+		ret.add(w)
+	}
+	return &ret, ret.chooseAll(curRev, compactRev)
+}
+
+func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
 	minRev := int64(math.MaxInt64)
 	for w := range wg.watchers {
 		if w.cur > curRev {