Browse Source

Merge pull request #8427 from gyuho/mvcc-patch-cherry-pick

mvcc: sending events after restore
Gyu-Ho Lee 8 years ago
parent
commit
7e6a0a8f92
2 changed files with 48 additions and 0 deletions
  1. 15 0
      mvcc/watchable_store.go
  2. 33 0
      mvcc/watchable_store_test.go

+ 15 - 0
mvcc/watchable_store.go

@@ -180,6 +180,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
 	s.mu.Unlock()
 }
 
+func (s *watchableStore) Restore(b backend.Backend) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	err := s.store.Restore(b)
+	if err != nil {
+		return err
+	}
+
+	for wa := range s.synced.watchers {
+		s.unsynced.watchers.add(wa)
+	}
+	s.synced = newWatcherGroup()
+	return nil
+}
+
 // syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
 func (s *watchableStore) syncWatchersLoop() {
 	defer s.wg.Done()

+ 33 - 0
mvcc/watchable_store_test.go

@@ -296,6 +296,39 @@ func TestWatchFutureRev(t *testing.T) {
 	}
 }
 
+func TestWatchRestore(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	defer cleanup(s, b, tmpPath)
+
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+	rev := s.Put(testKey, testValue, lease.NoLease)
+
+	newBackend, newPath := backend.NewDefaultTmpBackend()
+	newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
+	defer cleanup(newStore, newBackend, newPath)
+
+	w := newStore.NewWatchStream()
+	w.Watch(testKey, nil, rev-1)
+
+	newStore.Restore(b)
+	select {
+	case resp := <-w.Chan():
+		if resp.Revision != rev {
+			t.Fatalf("rev = %d, want %d", resp.Revision, rev)
+		}
+		if len(resp.Events) != 1 {
+			t.Fatalf("failed to get events from the response")
+		}
+		if resp.Events[0].Kv.ModRevision != rev {
+			t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
+		}
+	case <-time.After(time.Second):
+		t.Fatal("failed to receive event in 1 second.")
+	}
+}
+
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 func TestWatchBatchUnsynced(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()