Browse Source

storage: limit total unique revisions in unsynced watcher event list

Anthony Romano 9 years ago
parent
commit
7c17665a1a
2 changed files with 112 additions and 18 deletions
  1. 73 13
      storage/watchable_store.go
  2. 39 5
      storage/watchable_store_test.go

+ 73 - 13
storage/watchable_store.go

@@ -34,11 +34,64 @@ const (
 	chanBufLen = 1024
 )
 
+var (
+	// watchBatchMaxRevs is the maximum distinct revisions that
+	// may be sent to an unsynced watcher at a time. Declared as
+	// var instead of const for testing purposes.
+	watchBatchMaxRevs = 1000
+)
+
+type eventBatch struct {
+	// evs is a batch of revision-ordered events
+	evs []storagepb.Event
+	// revs is the minimum unique revisions observed for this batch
+	revs int
+	// moreRev is first revision with more events following this batch
+	moreRev int64
+}
+
 type (
 	watcherSetByKey map[string]watcherSet
 	watcherSet      map[*watcher]struct{}
+	watcherBatch    map[*watcher]*eventBatch
 )
 
+func (eb *eventBatch) add(ev storagepb.Event) {
+	if eb.revs > watchBatchMaxRevs {
+		// maxed out batch size
+		return
+	}
+
+	if len(eb.evs) == 0 {
+		// base case
+		eb.revs = 1
+		eb.evs = append(eb.evs, ev)
+		return
+	}
+
+	// revision accounting
+	ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
+	evRev := ev.Kv.ModRevision
+	if evRev > ebRev {
+		eb.revs++
+		if eb.revs > watchBatchMaxRevs {
+			eb.moreRev = evRev
+			return
+		}
+	}
+
+	eb.evs = append(eb.evs, ev)
+}
+
+func (wb watcherBatch) add(w *watcher, ev storagepb.Event) {
+	eb := wb[w]
+	if eb == nil {
+		eb = &eventBatch{}
+		wb[w] = eb
+	}
+	eb.add(ev)
+}
+
 func (w watcherSet) add(wa *watcher) {
 	if _, ok := w[wa]; ok {
 		panic("add watcher twice!")
@@ -310,17 +363,21 @@ func (s *watchableStore) syncWatchers() {
 	evs := kvsToEvents(revs, vs, s.unsynced, prefixes)
 	tx.Unlock()
 
-	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
+	for w, eb := range newWatcherBatch(s.unsynced, evs) {
 		select {
 		// s.store.Rev also uses Lock, so just return directly
-		case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
-			pendingEventsGauge.Add(float64(len(es)))
+		case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}:
+			pendingEventsGauge.Add(float64(len(eb.evs)))
 		default:
 			// TODO: handle the full unsynced watchers.
 			// continue to process other watchers for now, the full ones
 			// will be processed next time and hopefully it will not be full.
 			continue
 		}
+		if eb.moreRev != 0 {
+			w.cur = eb.moreRev
+			continue
+		}
 		w.cur = curRev
 		s.synced.add(w)
 		s.unsynced.delete(w)
@@ -393,16 +450,19 @@ func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struc
 // notify notifies the fact that given event at the given rev just happened to
 // watchers that watch on the key of the event.
 func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
-	we := newWatcherToEventMap(s.synced, evs)
+	we := newWatcherBatch(s.synced, evs)
 	for _, wm := range s.synced {
 		for w := range wm {
-			es, ok := we[w]
+			eb, ok := we[w]
 			if !ok {
 				continue
 			}
+			if eb.revs != 1 {
+				panic("unexpected multiple revisions in notification")
+			}
 			select {
-			case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}:
-				pendingEventsGauge.Add(float64(len(es)))
+			case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
+				pendingEventsGauge.Add(float64(len(eb.evs)))
 			default:
 				// move slow watcher to unsynced
 				w.cur = rev
@@ -433,10 +493,10 @@ type watcher struct {
 	ch chan<- WatchResponse
 }
 
-// newWatcherToEventMap creates a map that has watcher as key and events as
-// value. It enables quick events look up by watcher.
-func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event {
-	watcherToEvents := make(map[*watcher][]storagepb.Event)
+// newWatcherBatch maps watchers to their matched events. It enables quick
+// events look up by watcher.
+func newWatcherBatch(sm watcherSetByKey, evs []storagepb.Event) watcherBatch {
+	wb := make(watcherBatch)
 	for _, ev := range evs {
 		key := string(ev.Kv.Key)
 
@@ -453,12 +513,12 @@ func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watche
 				if !w.prefix && i != len(ev.Kv.Key) {
 					continue
 				}
-				watcherToEvents[w] = append(watcherToEvents[w], ev)
+				wb.add(w, ev)
 			}
 		}
 	}
 
-	return watcherToEvents
+	return wb
 }
 
 // matchPrefix returns true if key has any matching prefix

+ 39 - 5
storage/watchable_store_test.go

@@ -255,6 +255,40 @@ func TestWatchCompacted(t *testing.T) {
 	}
 }
 
+// TestWatchBatchUnsynced tests batching on unsynced watchers
+func TestWatchBatchUnsynced(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{})
+
+	oldMaxRevs := watchBatchMaxRevs
+	defer func() {
+		watchBatchMaxRevs = oldMaxRevs
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	batches := 3
+	watchBatchMaxRevs = 4
+
+	v := []byte("foo")
+	for i := 0; i < watchBatchMaxRevs*batches; i++ {
+		s.Put(v, v, lease.NoLease)
+	}
+
+	w := s.NewWatchStream()
+	w.Watch(v, false, 1)
+	for i := 0; i < batches; i++ {
+		if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
+			t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
+		}
+	}
+
+	s.store.mu.Lock()
+	defer s.store.mu.Unlock()
+	if len(s.synced) != 1 {
+		t.Errorf("synced size = %d, want 1", len(s.synced))
+	}
+}
+
 func TestNewMapwatcherToEventMap(t *testing.T) {
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
@@ -341,16 +375,16 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		gwe := newWatcherToEventMap(tt.sync, tt.evs)
+		gwe := newWatcherBatch(tt.sync, tt.evs)
 		if len(gwe) != len(tt.wwe) {
 			t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
 		}
 		// compare gwe and tt.wwe
-		for w, mevs := range gwe {
-			if len(mevs) != len(tt.wwe[w]) {
-				t.Errorf("#%d: len(mevs) got = %d, want = %d", i, len(mevs), len(tt.wwe[w]))
+		for w, eb := range gwe {
+			if len(eb.evs) != len(tt.wwe[w]) {
+				t.Errorf("#%d: len(eb.evs) got = %d, want = %d", i, len(eb.evs), len(tt.wwe[w]))
 			}
-			if !reflect.DeepEqual(mevs, tt.wwe[w]) {
+			if !reflect.DeepEqual(eb.evs, tt.wwe[w]) {
 				t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false)
 			}
 		}