Browse Source

clientv3/integration: test watch resume with disconnect before first event

Anthony Romano 8 years ago
parent
commit
ec470944f8
1 changed files with 51 additions and 1 deletions
  1. 51 1
      clientv3/integration/watch_test.go

+ 51 - 1
clientv3/integration/watch_test.go

@@ -343,7 +343,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
 	}
 }
 
-// TestWatchResumeComapcted checks that the watcher gracefully closes in case
+func TestWatchResumeInitRev(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+	if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
+		t.Fatal(err)
+	}
+	// if resume is broken, it'll pick up this key first instead of a=3
+	if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
+		t.Fatal(err)
+	}
+
+	wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
+	if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
+		t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
+	}
+	// pause wch
+	clus.Members[0].DropConnections()
+	clus.Members[0].PauseConnections()
+
+	select {
+	case resp, ok := <-wch:
+		t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	// resume wch
+	clus.Members[0].UnpauseConnections()
+
+	select {
+	case resp, ok := <-wch:
+		if !ok {
+			t.Fatal("unexpected watch close")
+		}
+		if len(resp.Events) == 0 {
+			t.Fatal("expected event on watch")
+		}
+		if string(resp.Events[0].Kv.Value) != "3" {
+			t.Fatalf("expected value=3, got event %+v", resp.Events[0])
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatal("watch timed out")
+	}
+}
+
+// TestWatchResumeCompacted checks that the watcher gracefully closes in case
 // that it tries to resume to a revision that's been compacted out of the store.
 // Since the watcher's server restarts with stale data, the watcher will receive
 // either a compaction error or all keys by staying in sync before the compaction