Browse Source

storage: do not send outdated events to unsynced watchers

Anthony Romano 9 years ago
parent
commit
2cbf7cf6d1
1 changed files with 58 additions and 57 deletions
  1. 58 57
      storage/watchable_store.go

+ 58 - 57
storage/watchable_store.go

@@ -155,6 +155,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 		evs[i] = storagepb.Event{
 			Type: storagepb.DELETE,
 			Kv:   &change}
+		evs[i].Kv.ModRevision = rev
 	}
 	s.notify(rev, evs)
 	return n, rev
@@ -177,6 +178,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 		return nil
 	}
 
+	rev := s.store.Rev()
 	evs := make([]storagepb.Event, len(changes))
 	for i, change := range changes {
 		switch change.Value {
@@ -184,6 +186,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 			evs[i] = storagepb.Event{
 				Type: storagepb.DELETE,
 				Kv:   &changes[i]}
+			evs[i].Kv.ModRevision = rev
 		default:
 			evs[i] = storagepb.Event{
 				Type: storagepb.PUT,
@@ -191,7 +194,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 		}
 	}
 
-	s.notify(s.store.Rev(), evs)
+	s.notify(rev, evs)
 	s.mu.Unlock()
 
 	return nil
@@ -284,12 +287,45 @@ func (s *watchableStore) syncWatchers() {
 	// in order to find key-value pairs from unsynced watchers, we need to
 	// find min revision index, and these revisions can be used to
 	// query the backend store of key-value pairs
-	minRev := int64(math.MaxInt64)
+	prefixes, minRev := s.scanUnsync()
+	curRev := s.store.currentRev.main
+	minBytes, maxBytes := newRevBytes(), newRevBytes()
+	revToBytes(revision{main: minRev}, minBytes)
+	revToBytes(revision{main: curRev + 1}, maxBytes)
+
+	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
+	// values are actual key-value pairs in backend.
+	tx := s.store.b.BatchTx()
+	tx.Lock()
+	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
+	evs := kvsToEvents(revs, vs, s.unsynced, prefixes)
+	tx.Unlock()
 
+	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
+		select {
+		// s.store.Rev also uses Lock, so just return directly
+		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
+			pendingEventsGauge.Add(float64(len(es)))
+		default:
+			// TODO: handle the full unsynced watchers.
+			// continue to process other watchers for now, the full ones
+			// will be processed next time and hopefully it will not be full.
+			continue
+		}
+		w.cur = curRev
+		s.synced.add(w)
+		s.unsynced.delete(w)
+	}
+
+	slowWatcherGauge.Set(float64(len(s.unsynced)))
+}
+
+func (s *watchableStore) scanUnsync() (prefixes map[string]struct{}, minRev int64) {
 	curRev := s.store.currentRev.main
 	compactionRev := s.store.compactMainRev
 
-	prefixes := make(map[string]struct{})
+	prefixes = make(map[string]struct{})
+	minRev = int64(math.MaxInt64)
 	for _, set := range s.unsynced {
 		for w := range set {
 			k := string(w.key)
@@ -308,7 +344,7 @@ func (s *watchableStore) syncWatchers() {
 				continue
 			}
 
-			if minRev >= w.cur {
+			if minRev > w.cur {
 				minRev = w.cur
 			}
 
@@ -318,60 +354,31 @@ func (s *watchableStore) syncWatchers() {
 		}
 	}
 
-	minBytes, maxBytes := newRevBytes(), newRevBytes()
-	revToBytes(revision{main: minRev}, minBytes)
-	revToBytes(revision{main: curRev + 1}, maxBytes)
-
-	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
-	// values are actual key-value pairs in backend.
-	tx := s.store.b.BatchTx()
-	tx.Lock()
-	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
-
-	evs := []storagepb.Event{}
+	return prefixes, minRev
+}
 
-	// get the list of all events from all key-value pairs
-	for i, v := range vs {
+// kvsToEvents gets all events for the watchers from all key-value pairs
+func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struct{}) (evs []storagepb.Event) {
+	for i, v := range vals {
 		var kv storagepb.KeyValue
 		if err := kv.Unmarshal(v); err != nil {
 			log.Panicf("storage: cannot unmarshal event: %v", err)
 		}
 
 		k := string(kv.Key)
-		if _, ok := s.unsynced.getSetByKey(k); !ok && !matchPrefix(k, prefixes) {
+		if _, ok := wsk.getSetByKey(k); !ok && !matchPrefix(k, pfxs) {
 			continue
 		}
 
-		var ev storagepb.Event
-		switch {
-		case isTombstone(ks[i]):
-			ev.Type = storagepb.DELETE
-		default:
-			ev.Type = storagepb.PUT
-		}
-		ev.Kv = &kv
-
-		evs = append(evs, ev)
-	}
-	tx.Unlock()
-
-	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
-		select {
-		// s.store.Rev also uses Lock, so just return directly
-		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
-			pendingEventsGauge.Add(float64(len(es)))
-		default:
-			// TODO: handle the full unsynced watchers.
-			// continue to process other watchers for now, the full ones
-			// will be processed next time and hopefully it will not be full.
-			continue
+		ty := storagepb.PUT
+		if isTombstone(revs[i]) {
+			ty = storagepb.DELETE
+			// patch in mod revision so watchers won't skip
+			kv.ModRevision = bytesToRev(revs[i]).main
 		}
-		w.cur = curRev
-		s.synced.add(w)
-		s.unsynced.delete(w)
+		evs = append(evs, storagepb.Event{Kv: &kv, Type: ty})
 	}
-
-	slowWatcherGauge.Set(float64(len(s.unsynced)))
+	return evs
 }
 
 // notify notifies the fact that given event at the given rev just happened to
@@ -426,23 +433,17 @@ func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watche
 
 		// check all prefixes of the key to notify all corresponded watchers
 		for i := 0; i <= len(key); i++ {
-			k := string(key[:i])
-
-			wm, ok := sm[k]
-			if !ok {
-				continue
-			}
+			for w := range sm[key[:i]] {
+				// don't double notify
+				if ev.Kv.ModRevision < w.cur {
+					continue
+				}
 
-			for w := range wm {
 				// the watcher needs to be notified when either it watches prefix or
 				// the key is exactly matched.
 				if !w.prefix && i != len(ev.Kv.Key) {
 					continue
 				}
-
-				if _, ok := watcherToEvents[w]; !ok {
-					watcherToEvents[w] = []storagepb.Event{}
-				}
 				watcherToEvents[w] = append(watcherToEvents[w], ev)
 			}
 		}