Browse Source

Merge pull request #4392 from xiang90/watch

storage: make unsync a watcherSetByKey
Xiang Li 10 years ago
parent
commit
d7f6ad0334

+ 32 - 31
storage/watchable_store.go

@@ -55,6 +55,11 @@ func (w watcherSetByKey) add(wa *watcher) {
 	set.add(wa)
 	set.add(wa)
 }
 }
 
 
+func (w watcherSetByKey) getSetByKey(key string) (watcherSet, bool) {
+	set, ok := w[key]
+	return set, ok
+}
+
 func (w watcherSetByKey) delete(wa *watcher) bool {
 func (w watcherSetByKey) delete(wa *watcher) bool {
 	k := string(wa.key)
 	k := string(wa.key)
 	if v, ok := w[k]; ok {
 	if v, ok := w[k]; ok {
@@ -82,7 +87,7 @@ type watchableStore struct {
 	*store
 	*store
 
 
 	// contains all unsynced watchers that needs to sync with events that have happened
 	// contains all unsynced watchers that needs to sync with events that have happened
-	unsynced watcherSet
+	unsynced watcherSetByKey
 
 
 	// contains all synced watchers that are in sync with the progress of the store.
 	// contains all synced watchers that are in sync with the progress of the store.
 	// The key of the map is the key that the watcher watches on.
 	// The key of the map is the key that the watcher watches on.
@@ -99,7 +104,7 @@ type cancelFunc func()
 func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
 func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
 	s := &watchableStore{
 	s := &watchableStore{
 		store:    NewStore(b, le),
 		store:    NewStore(b, le),
-		unsynced: make(watcherSet),
+		unsynced: make(watcherSetByKey),
 		synced:   make(watcherSetByKey),
 		synced:   make(watcherSetByKey),
 		stopc:    make(chan struct{}),
 		stopc:    make(chan struct{}),
 	}
 	}
@@ -223,16 +228,15 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch
 		s.synced.add(wa)
 		s.synced.add(wa)
 	} else {
 	} else {
 		slowWatcherGauge.Inc()
 		slowWatcherGauge.Inc()
-		s.unsynced[wa] = struct{}{}
+		s.unsynced.add(wa)
 	}
 	}
 	watcherGauge.Inc()
 	watcherGauge.Inc()
 
 
 	cancel := cancelFunc(func() {
 	cancel := cancelFunc(func() {
 		s.mu.Lock()
 		s.mu.Lock()
 		defer s.mu.Unlock()
 		defer s.mu.Unlock()
-		// remove global references of the watcher
-		if _, ok := s.unsynced[wa]; ok {
-			delete(s.unsynced, wa)
+		// remove references of the watcher
+		if s.unsynced.delete(wa) {
 			slowWatcherGauge.Dec()
 			slowWatcherGauge.Dec()
 			watcherGauge.Dec()
 			watcherGauge.Dec()
 			return
 			return
@@ -285,32 +289,29 @@ func (s *watchableStore) syncWatchers() {
 	curRev := s.store.currentRev.main
 	curRev := s.store.currentRev.main
 	compactionRev := s.store.compactMainRev
 	compactionRev := s.store.compactMainRev
 
 
-	// TODO: change unsynced struct type same to this
-	keyToUnsynced := make(watcherSetByKey)
 	prefixes := make(map[string]struct{})
 	prefixes := make(map[string]struct{})
+	for _, set := range s.unsynced {
+		for w := range set {
+			k := string(w.key)
 
 
-	for w := range s.unsynced {
-		k := string(w.key)
-
-		if w.cur > curRev {
-			panic("watcher current revision should not exceed current revision")
-		}
-
-		if w.cur < compactionRev {
-			// TODO: return error compacted to that watcher instead of
-			// just removing it silently from unsynced.
-			delete(s.unsynced, w)
-			continue
-		}
+			if w.cur > curRev {
+				panic("watcher current revision should not exceed current revision")
+			}
 
 
-		if minRev >= w.cur {
-			minRev = w.cur
-		}
+			if w.cur < compactionRev {
+				// TODO: return error compacted to that watcher instead of
+				// just removing it silently from unsynced.
+				s.unsynced.delete(w)
+				continue
+			}
 
 
-		keyToUnsynced.add(w)
+			if minRev >= w.cur {
+				minRev = w.cur
+			}
 
 
-		if w.prefix {
-			prefixes[k] = struct{}{}
+			if w.prefix {
+				prefixes[k] = struct{}{}
+			}
 		}
 		}
 	}
 	}
 
 
@@ -335,7 +336,7 @@ func (s *watchableStore) syncWatchers() {
 		}
 		}
 
 
 		k := string(kv.Key)
 		k := string(kv.Key)
-		if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixes) {
+		if _, ok := s.unsynced.getSetByKey(k); !ok && !matchPrefix(k, prefixes) {
 			continue
 			continue
 		}
 		}
 
 
@@ -351,7 +352,7 @@ func (s *watchableStore) syncWatchers() {
 		evs = append(evs, ev)
 		evs = append(evs, ev)
 	}
 	}
 
 
-	for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
+	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
 		select {
 		select {
 		// s.store.Rev also uses Lock, so just return directly
 		// s.store.Rev also uses Lock, so just return directly
 		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
 		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
@@ -363,7 +364,7 @@ func (s *watchableStore) syncWatchers() {
 			continue
 			continue
 		}
 		}
 		s.synced.add(w)
 		s.synced.add(w)
-		delete(s.unsynced, w)
+		s.unsynced.delete(w)
 	}
 	}
 
 
 	slowWatcherGauge.Set(float64(len(s.unsynced)))
 	slowWatcherGauge.Set(float64(len(s.unsynced)))
@@ -390,7 +391,7 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 			default:
 			default:
 				// move slow watcher to unsynced
 				// move slow watcher to unsynced
 				w.cur = rev
 				w.cur = rev
-				s.unsynced[w] = struct{}{}
+				s.unsynced.add(w)
 				delete(wm, w)
 				delete(wm, w)
 				slowWatcherGauge.Inc()
 				slowWatcherGauge.Inc()
 			}
 			}

+ 1 - 1
storage/watchable_store_bench_test.go

@@ -40,7 +40,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	// in unsynced for this benchmark.
 	// in unsynced for this benchmark.
 	ws := &watchableStore{
 	ws := &watchableStore{
 		store:    s,
 		store:    s,
-		unsynced: make(watcherSet),
+		unsynced: make(watcherSetByKey),
 
 
 		// to make the test not crash from assigning to nil map.
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
 		// 'synced' doesn't get populated in this test.

+ 2 - 2
storage/watchable_store_test.go

@@ -82,7 +82,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// in unsynced to test if syncWatchers works as expected.
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
 	s := &watchableStore{
 		store:    NewStore(b, &lease.FakeLessor{}),
 		store:    NewStore(b, &lease.FakeLessor{}),
-		unsynced: make(watcherSet),
+		unsynced: make(watcherSetByKey),
 
 
 		// to make the test not crash from assigning to nil map.
 		// to make the test not crash from assigning to nil map.
 		// 'synced' doesn't get populated in this test.
 		// 'synced' doesn't get populated in this test.
@@ -137,7 +137,7 @@ func TestSyncWatchers(t *testing.T) {
 
 
 	s := &watchableStore{
 	s := &watchableStore{
 		store:    NewStore(b, &lease.FakeLessor{}),
 		store:    NewStore(b, &lease.FakeLessor{}),
-		unsynced: make(watcherSet),
+		unsynced: make(watcherSetByKey),
 		synced:   make(watcherSetByKey),
 		synced:   make(watcherSetByKey),
 	}
 	}