瀏覽代碼

mvcc: restore unsynced watchers

In case syncWatchersLoop() starts before Restore() is called,
watchers already added by that moment are moved to s.synced by the loop.
However, there is a broken logic that moves watchers from s.synced
to s.uncyned without setting keyWatchers of the watcherGroup.
Eventually syncWatchers() fails to pickup those watchers from s.unsynced
and no events are sent to the watchers, because newWatcherBatch() called
in the function uses wg.watcherSetByKey() internally that requires
a proper keyWatchers value.
Iwasaki Yudai 8 年之前
父節點
當前提交
eaf7d631ad
共有 2 個文件被更改,包括 34 次插入25 次删除
  1. 1 1
      mvcc/watchable_store.go
  2. 33 24
      mvcc/watchable_store_test.go

+ 1 - 1
mvcc/watchable_store.go

@@ -192,7 +192,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
 	}
 
 	for wa := range s.synced.watchers {
-		s.unsynced.watchers.add(wa)
+		s.unsynced.add(wa)
 	}
 	s.synced = newWatcherGroup()
 	return nil

+ 33 - 24
mvcc/watchable_store_test.go

@@ -297,36 +297,45 @@ func TestWatchFutureRev(t *testing.T) {
 }
 
 func TestWatchRestore(t *testing.T) {
-	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
-	defer cleanup(s, b, tmpPath)
+	test := func(delay time.Duration) func(t *testing.T) {
+		return func(t *testing.T) {
+			b, tmpPath := backend.NewDefaultTmpBackend()
+			s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+			defer cleanup(s, b, tmpPath)
 
-	testKey := []byte("foo")
-	testValue := []byte("bar")
-	rev := s.Put(testKey, testValue, lease.NoLease)
+			testKey := []byte("foo")
+			testValue := []byte("bar")
+			rev := s.Put(testKey, testValue, lease.NoLease)
 
-	newBackend, newPath := backend.NewDefaultTmpBackend()
-	newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
-	defer cleanup(newStore, newBackend, newPath)
+			newBackend, newPath := backend.NewDefaultTmpBackend()
+			newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
+			defer cleanup(newStore, newBackend, newPath)
 
-	w := newStore.NewWatchStream()
-	w.Watch(testKey, nil, rev-1)
+			w := newStore.NewWatchStream()
+			w.Watch(testKey, nil, rev-1)
 
-	newStore.Restore(b)
-	select {
-	case resp := <-w.Chan():
-		if resp.Revision != rev {
-			t.Fatalf("rev = %d, want %d", resp.Revision, rev)
-		}
-		if len(resp.Events) != 1 {
-			t.Fatalf("failed to get events from the response")
-		}
-		if resp.Events[0].Kv.ModRevision != rev {
-			t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
+			time.Sleep(delay)
+
+			newStore.Restore(b)
+			select {
+			case resp := <-w.Chan():
+				if resp.Revision != rev {
+					t.Fatalf("rev = %d, want %d", resp.Revision, rev)
+				}
+				if len(resp.Events) != 1 {
+					t.Fatalf("failed to get events from the response")
+				}
+				if resp.Events[0].Kv.ModRevision != rev {
+					t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
+				}
+			case <-time.After(time.Second):
+				t.Fatal("failed to receive event in 1 second.")
+			}
 		}
-	case <-time.After(time.Second):
-		t.Fatal("failed to receive event in 1 second.")
 	}
+
+	t.Run("Normal", test(0))
+	t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
 }
 
 // TestWatchBatchUnsynced tests batching on unsynced watchers