Browse Source

integration: test client watchers with overlapped context cancels

Anthony Romano 9 years ago
parent
commit
deef16b376
1 changed files with 60 additions and 0 deletions
  1. 60 0
      clientv3/integration/watch_test.go

+ 60 - 0
clientv3/integration/watch_test.go

@@ -759,3 +759,63 @@ func TestWatchCancelOnServer(t *testing.T) {
 		t.Fatalf("expected 0 watchers, got %q", watchers)
 		t.Fatalf("expected 0 watchers, got %q", watchers)
 	}
 	}
 }
 }
+
+// TestWatchOverlapContextCancel stresses the watcher stream teardown path by
+// creating/canceling watchers to ensure that new watchers are not taken down
+// by a torn down watch stream. The sort of race that's being detected:
+//     1. create w1 using a cancelable ctx with %v as "ctx"
+//     2. cancel ctx
+//     3. watcher client begins tearing down watcher grpc stream since no more watchers
+//     3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
+//     4. watcher client finishes tearing down stream on "ctx"
+//     5. w2 comes back canceled
+func TestWatchOverlapContextCancel(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.RandClient()
+	if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
+		t.Fatal(err)
+	}
+
+	// each unique context "%v" has a unique grpc stream
+	n := 100
+	ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
+	for i := range ctxs {
+		// make "%v" unique
+		ctxs[i] = context.WithValue(context.TODO(), "key", i)
+		// limits the maximum number of outstanding watchers per stream
+		ctxc[i] = make(chan struct{}, 2)
+	}
+	ch := make(chan struct{}, n)
+	// issue concurrent watches with cancel
+	for i := 0; i < n; i++ {
+		go func() {
+			defer func() { ch <- struct{}{} }()
+			idx := rand.Intn(len(ctxs))
+			ctx, cancel := context.WithCancel(ctxs[idx])
+			ctxc[idx] <- struct{}{}
+			ch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
+			if _, ok := <-ch; !ok {
+				t.Fatalf("unexpected closed channel")
+			}
+			// randomize how cancel overlaps with watch creation
+			if rand.Intn(2) == 0 {
+				<-ctxc[idx]
+				cancel()
+			} else {
+				cancel()
+				<-ctxc[idx]
+			}
+		}()
+	}
+	// join on watches
+	for i := 0; i < n; i++ {
+		select {
+		case <-ch:
+		case <-time.After(time.Second):
+			t.Fatalf("timed out waiting for completed watch")
+		}
+	}
+}