Browse Source

integration: account for unsynced server in TestWatchResumeCompacted

The watch's etcd server is shutdown to keep the watch in a retry state as
keys are put and compacted on the cluster. When the server restarts,
there is a window where the compact hasn't been applied which may cause
the watch to receive all events instead of only a compaction error.

Fixes #6535
Anthony Romano 9 years ago
parent
commit
8f3abda5b8
1 changed files with 45 additions and 10 deletions
  1. 45 10
      clientv3/integration/watch_test.go

+ 45 - 10
clientv3/integration/watch_test.go

@@ -349,6 +349,9 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 
 
 // TestWatchResumeComapcted checks that the watcher gracefully closes in case
 // TestWatchResumeComapcted checks that the watcher gracefully closes in case
 // that it tries to resume to a revision that's been compacted out of the store.
 // that it tries to resume to a revision that's been compacted out of the store.
+// Since the watcher's server restarts with stale data, the watcher will receive
+// either a compaction error or all keys by staying in sync before the compaction
+// is finally applied.
 func TestWatchResumeCompacted(t *testing.T) {
 func TestWatchResumeCompacted(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 
 
@@ -377,8 +380,9 @@ func TestWatchResumeCompacted(t *testing.T) {
 	}
 	}
 
 
 	// put some data and compact away
 	// put some data and compact away
+	numPuts := 5
 	kv := clientv3.NewKV(clus.Client(1))
 	kv := clientv3.NewKV(clus.Client(1))
-	for i := 0; i < 5; i++ {
+	for i := 0; i < numPuts; i++ {
 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
@@ -389,17 +393,48 @@ func TestWatchResumeCompacted(t *testing.T) {
 
 
 	clus.Members[0].Restart(t)
 	clus.Members[0].Restart(t)
 
 
-	// get compacted error message
-	wresp, ok := <-wch
-	if !ok {
-		t.Fatalf("expected wresp, but got closed channel")
+	// since watch's server isn't guaranteed to be synced with the cluster when
+	// the watch resumes, there is a window where the watch can stay synced and
+	// read off all events; if the watcher misses the window, it will go out of
+	// sync and get a compaction error.
+	wRev := int64(2)
+	for int(wRev) <= numPuts+1 {
+		var wresp clientv3.WatchResponse
+		var ok bool
+		select {
+		case wresp, ok = <-wch:
+			if !ok {
+				t.Fatalf("expected wresp, but got closed channel")
+			}
+		case <-time.After(5 * time.Second):
+			t.Fatalf("compacted watch timed out")
+		}
+		for _, ev := range wresp.Events {
+			if ev.Kv.ModRevision != wRev {
+				t.Fatalf("expected modRev %v, got %+v", wRev, ev)
+			}
+			wRev++
+		}
+		if wresp.Err() == nil {
+			continue
+		}
+		if wresp.Err() != rpctypes.ErrCompacted {
+			t.Fatalf("wresp.Err() expected %v, but got %v %+v", rpctypes.ErrCompacted, wresp.Err())
+		}
+		break
 	}
 	}
-	if wresp.Err() != rpctypes.ErrCompacted {
-		t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
+	if int(wRev) > numPuts+1 {
+		// got data faster than the compaction
+		return
 	}
 	}
-	// ensure the channel is closed
-	if wresp, ok = <-wch; ok {
-		t.Fatalf("expected closed channel, but got %v", wresp)
+	// received compaction error; ensure the channel closes
+	select {
+	case wresp, ok := <-wch:
+		if ok {
+			t.Fatalf("expected closed channel, but got %v", wresp)
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatalf("timed out waiting for channel close")
 	}
 	}
 }
 }