Browse Source

integration: test canceling watchers when disconnected

Anthony Romano 9 years ago
parent
commit
9ce398f8a6
1 changed files with 30 additions and 10 deletions
  1. 30 10
      clientv3/integration/watch_test.go

+ 30 - 10
clientv3/integration/watch_test.go

@@ -770,15 +770,22 @@ func TestWatchCancelOnServer(t *testing.T) {
 //     4. watcher client finishes tearing down stream on "ctx"
 //     5. w2 comes back canceled
 func TestWatchOverlapContextCancel(t *testing.T) {
+	f := func(clus *integration.ClusterV3) {}
+	testWatchOverlapContextCancel(t, f)
+}
+
+func TestWatchOverlapDropConnContextCancel(t *testing.T) {
+	f := func(clus *integration.ClusterV3) {
+		clus.Members[0].DropConnections()
+	}
+	testWatchOverlapContextCancel(t, f)
+}
+
+func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
 	defer testutil.AfterTest(t)
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	cli := clus.RandClient()
-	if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
-		t.Fatal(err)
-	}
-
 	// each unique context "%v" has a unique grpc stream
 	n := 100
 	ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
@@ -788,17 +795,30 @@ func TestWatchOverlapContextCancel(t *testing.T) {
 		// limits the maximum number of outstanding watchers per stream
 		ctxc[i] = make(chan struct{}, 2)
 	}
+
+	// issue concurrent watches on "abc" with cancel
+	cli := clus.RandClient()
+	if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
+		t.Fatal(err)
+	}
 	ch := make(chan struct{}, n)
-	// issue concurrent watches with cancel
 	for i := 0; i < n; i++ {
 		go func() {
 			defer func() { ch <- struct{}{} }()
 			idx := rand.Intn(len(ctxs))
 			ctx, cancel := context.WithCancel(ctxs[idx])
 			ctxc[idx] <- struct{}{}
-			ch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
-			if _, ok := <-ch; !ok {
-				t.Fatalf("unexpected closed channel")
+			wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
+			f(clus)
+			select {
+			case _, ok := <-wch:
+				if !ok {
+					t.Fatalf("unexpected closed channel %p", wch)
+				}
+			// may take a second or two to reestablish a watcher because of
+			// grpc backoff policies for disconnects
+			case <-time.After(5 * time.Second):
+				t.Errorf("timed out waiting for watch on %p", wch)
 			}
 			// randomize how cancel overlaps with watch creation
 			if rand.Intn(2) == 0 {
@@ -814,7 +834,7 @@ func TestWatchOverlapContextCancel(t *testing.T) {
 	for i := 0; i < n; i++ {
 		select {
 		case <-ch:
-		case <-time.After(time.Second):
+		case <-time.After(5 * time.Second):
 			t.Fatalf("timed out waiting for completed watch")
 		}
 	}