|
@@ -339,6 +339,62 @@ func TestWatchRestore(t *testing.T) {
|
|
|
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
|
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// TestWatchRestoreSyncedWatcher tests such a case that:
|
|
|
|
|
+// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
|
|
|
|
+// 2. watcher with a future revision is added to "synced" watcher group
|
|
|
|
|
+// 3. restore/overwrite storage with snapshot of a higher lasat revision
|
|
|
|
|
+// 4. restore operation moves "synced" to "unsynced" watcher group
|
|
|
|
|
+// 5. choose the watcher from step 1, without panic
|
|
|
|
|
+func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
|
|
|
|
+ b1, b1Path := backend.NewDefaultTmpBackend()
|
|
|
|
|
+ s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil)
|
|
|
|
|
+ defer cleanup(s1, b1, b1Path)
|
|
|
|
|
+
|
|
|
|
|
+ b2, b2Path := backend.NewDefaultTmpBackend()
|
|
|
|
|
+ s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil)
|
|
|
|
|
+ defer cleanup(s2, b2, b2Path)
|
|
|
|
|
+
|
|
|
|
|
+ testKey, testValue := []byte("foo"), []byte("bar")
|
|
|
|
|
+ rev := s1.Put(testKey, testValue, lease.NoLease)
|
|
|
|
|
+ startRev := rev + 2
|
|
|
|
|
+
|
|
|
|
|
+ // create a watcher with a future revision
|
|
|
|
|
+ // add to "synced" watcher group (startRev > s.store.currentRev)
|
|
|
|
|
+ w1 := s1.NewWatchStream()
|
|
|
|
|
+ w1.Watch(0, testKey, nil, startRev)
|
|
|
|
|
+
|
|
|
|
|
+ // make "s2" ends up with a higher last revision
|
|
|
|
|
+ s2.Put(testKey, testValue, lease.NoLease)
|
|
|
|
|
+ s2.Put(testKey, testValue, lease.NoLease)
|
|
|
|
|
+
|
|
|
|
|
+ // overwrite storage with higher revisions
|
|
|
|
|
+ if err := s1.Restore(b2); err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // wait for next "syncWatchersLoop" iteration
|
|
|
|
|
+ // and the unsynced watcher should be chosen
|
|
|
|
|
+ time.Sleep(2 * time.Second)
|
|
|
|
|
+
|
|
|
|
|
+ // trigger events for "startRev"
|
|
|
|
|
+ s1.Put(testKey, testValue, lease.NoLease)
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case resp := <-w1.Chan():
|
|
|
|
|
+ if resp.Revision != startRev {
|
|
|
|
|
+ t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(resp.Events) != 1 {
|
|
|
|
|
+ t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
|
|
|
|
|
+ }
|
|
|
|
|
+ if resp.Events[0].Kv.ModRevision != startRev {
|
|
|
|
|
+ t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
|
|
|
|
|
+ }
|
|
|
|
|
+ case <-time.After(time.Second):
|
|
|
|
|
+ t.Fatal("failed to receive event in 1 second")
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
|
|
func TestWatchBatchUnsynced(t *testing.T) {
|
|
func TestWatchBatchUnsynced(t *testing.T) {
|
|
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
|
b, tmpPath := backend.NewDefaultTmpBackend()
|