Browse Source

Merge pull request #4396 from xiang90/fix_watch

storage: update watch.cur and fix tests
Xiang Li 10 years ago
parent
commit
456975f631
2 changed files with 33 additions and 31 deletions
  1. 1 0
      storage/watchable_store.go
  2. 32 31
      storage/watchable_store_test.go

+ 1 - 0
storage/watchable_store.go

@@ -366,6 +366,7 @@ func (s *watchableStore) syncWatchers() {
 			// will be processed next time and hopefully it will not be full.
 			continue
 		}
+		w.cur = curRev
 		s.synced.add(w)
 		s.unsynced.delete(w)
 	}

+ 32 - 31
storage/watchable_store_test.go

@@ -157,51 +157,52 @@ func TestSyncWatchers(t *testing.T) {
 	watcherN := 100
 
 	for i := 0; i < watcherN; i++ {
-		// use 1 to keep watchers in unsynced
+		// specify rev as 1 to keep watchers in unsynced
 		w.Watch(testKey, true, 1)
 	}
 
-	// Before running s.syncWatchers()
-	//
-	// synced should be empty
-	// because we manually populate unsynced only
-	if len(s.synced[string(testKey)]) != 0 {
-		t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)]))
+	// Before running s.syncWatchers() synced should be empty because we manually
+	// populate unsynced only
+	sws, _ := s.synced.getSetByKey(string(testKey))
+	uws, _ := s.unsynced.getSetByKey(string(testKey))
+
+	if len(sws) != 0 {
+		t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws))
 	}
-	// unsynced should not be empty
-	// because we manually populated unsynced only
-	if len(s.unsynced) == 0 {
-		t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
+	// unsynced should not be empty because we manually populated unsynced only
+	if len(uws) != watcherN {
+		t.Errorf("unsynced size = %d, want %d", len(uws), watcherN)
 	}
 
-	// this should move all unsynced watchers
-	// to synced ones
+	// this should move all unsynced watchers to synced ones
 	s.syncWatchers()
 
-	// After running s.syncWatchers()
-	//
-	// synced should not be empty
-	// because syncwatchers populates synced
-	// in this test case
-	if len(s.synced[string(testKey)]) == 0 {
-		t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
+	sws, _ = s.synced.getSetByKey(string(testKey))
+	uws, _ = s.unsynced.getSetByKey(string(testKey))
+
+	// After running s.syncWatchers(), synced should not be empty because syncwatchers
+	// populates synced in this test case
+	if len(sws) != watcherN {
+		t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN)
 	}
-	// unsynced should be empty
-	// because syncwatchers is expected to move
-	// all watchers from unsynced to synced
-	// in this test case
-	if len(s.unsynced) != 0 {
-		t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
+
+	// unsynced should be empty because syncwatchers is expected to move all watchers
+	// from unsynced to synced in this test case
+	if len(uws) != 0 {
+		t.Errorf("unsynced size = %d, want 0", len(uws))
+	}
+
+	for w := range sws {
+		if w.cur != s.Rev() {
+			t.Errorf("w.cur = %d, want %d", w.cur, s.Rev())
+		}
 	}
 
-	// All of the watchers actually share one channel
-	// so we only need to check one shared channel
-	// (See watcher.go for more detail).
 	if len(w.(*watchStream).ch) != watcherN {
 		t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
 	}
-	wr := <-w.(*watchStream).ch
-	evs := wr.Events
+
+	evs := (<-w.(*watchStream).ch).Events
 	if len(evs) != 1 {
 		t.Errorf("len(evs) got = %d, want = 1", len(evs))
 	}