Browse Source

Merge pull request #4225 from gyuho/watch_test_multi

integration: add TestV3WatchMultiple
Gyu-Ho Lee 10 years ago
parent
commit
98dfdebf13
1 changed files with 113 additions and 0 deletions
  1. 113 0
      integration/v3_grpc_test.go

+ 113 - 0
integration/v3_grpc_test.go

@@ -491,3 +491,116 @@ func TestV3WatchCancel(t *testing.T) {
 
 	clus.Terminate(t)
 }
+
+// TestV3WatchMultiple tests multiple watchers on the same key
+// and one watcher with matching prefix. It first puts the key
+// that matches all watchers, and another key that matches only
+// one watcher to test if it receives expected events.
+func TestV3WatchMultiple(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	wAPI := pb.NewWatchClient(clus.RandConn())
+	kvc := pb.NewKVClient(clus.RandConn())
+
+	wStream, errW := wAPI.Watch(context.TODO())
+	if errW != nil {
+		t.Fatalf("wAPI.Watch error: %v", errW)
+	}
+
+	watchKeyN := 4
+	for i := 0; i < watchKeyN+1; i++ {
+		var wreq *pb.WatchRequest
+		if i < watchKeyN {
+			wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}
+		} else {
+			wreq = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("fo")}}
+		}
+		if err := wStream.Send(wreq); err != nil {
+			t.Fatalf("wStream.Send error: %v", err)
+		}
+	}
+
+	ids := make(map[int64]struct{})
+	for i := 0; i < watchKeyN+1; i++ {
+		wresp, err := wStream.Recv()
+		if err != nil {
+			t.Fatalf("wStream.Recv error: %v", err)
+		}
+		if !wresp.Created {
+			t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
+		}
+		ids[wresp.WatchId] = struct{}{}
+	}
+
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+
+	for i := 0; i < watchKeyN+1; i++ {
+		wresp, err := wStream.Recv()
+		if err != nil {
+			t.Fatalf("wStream.Recv error: %v", err)
+		}
+		if _, ok := ids[wresp.WatchId]; !ok {
+			t.Errorf("watchId %d is not created!", wresp.WatchId)
+		} else {
+			delete(ids, wresp.WatchId)
+		}
+		if len(wresp.Events) == 0 {
+			t.Errorf("#%d: no events received", i)
+		}
+		for _, ev := range wresp.Events {
+			if string(ev.Kv.Key) != "foo" {
+				t.Errorf("ev.Kv.Key got = %s, want = foo", ev.Kv.Key)
+			}
+			if string(ev.Kv.Value) != "bar" {
+				t.Errorf("ev.Kv.Value got = %s, want = bar", ev.Kv.Value)
+			}
+		}
+	}
+
+	// now put one key that has only one matching watcher
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("fo"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+	wresp, err := wStream.Recv()
+	if err != nil {
+		t.Errorf("wStream.Recv error: %v", err)
+	}
+	if len(wresp.Events) != 1 {
+		t.Fatalf("len(wresp.Events) got = %d, want = 1", len(wresp.Events))
+	}
+	if string(wresp.Events[0].Kv.Key) != "fo" {
+		t.Errorf("wresp.Events[0].Kv.Key got = %s, want = fo", wresp.Events[0].Kv.Key)
+	}
+
+	// now Recv should block because there is no more events coming
+	rok, nr := WaitResponse(wStream, 1*time.Second)
+	if !rok {
+		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
+	}
+
+	clus.Terminate(t)
+}
+
+// WaitResponse waits on the given stream for given duration.
+// If there is no more events, true and a nil response will be
+// returned closing the WatchClient stream. Or the response will
+// be returned.
+func WaitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) {
+	rCh := make(chan *pb.WatchResponse)
+	go func() {
+		resp, _ := wc.Recv()
+		rCh <- resp
+	}()
+	select {
+	case nr := <-rCh:
+		return false, nr
+	case <-time.After(timeout):
+	}
+	wc.CloseSend()
+	rv, ok := <-rCh
+	if rv != nil || !ok {
+		return false, rv
+	}
+	return true, nil
+}