浏览代码

Merge pull request #4242 from gyuho/unsynced_multi

integration: TestV3WatchMultipleEventsPutUnsynced
Gyu-Ho Lee 10 年之前
父节点
当前提交
cd323e0ec8
共有 1 个文件被更改,包括 74 次插入0 次删除
  1. 74 0
      integration/v3_grpc_test.go

+ 74 - 0
integration/v3_grpc_test.go

@@ -666,6 +666,80 @@ func (evs eventsSortByKey) Len() int           { return len(evs) }
 func (evs eventsSortByKey) Swap(i, j int)      { evs[i], evs[j] = evs[j], evs[i] }
 func (evs eventsSortByKey) Swap(i, j int)      { evs[i], evs[j] = evs[j], evs[i] }
 func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
 func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
 
 
+func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	kvc := pb.NewKVClient(clus.RandConn())
+
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+
+	wAPI := pb.NewWatchClient(clus.RandConn())
+	wStream, wErr := wAPI.Watch(context.TODO())
+	if wErr != nil {
+		t.Fatalf("wAPI.Watch error: %v", wErr)
+	}
+
+	if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo"), StartRevision: 1}}); err != nil {
+		t.Fatalf("wStream.Send error: %v", err)
+	}
+
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+
+	allWevents := []*storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 2},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 5, Version: 2},
+		},
+	}
+
+	events := []*storagepb.Event{}
+	for len(events) < 4 {
+		resp, err := wStream.Recv()
+		if err != nil {
+			t.Errorf("wStream.Recv error: %v", err)
+		}
+		if resp.Created {
+			continue
+		}
+		events = append(events, resp.Events...)
+		// if PUT requests are committed by now, first receive would return
+		// multiple events, but if not, it returns a single event. In SSD,
+		// it should return 4 events at once.
+	}
+
+	if !reflect.DeepEqual(events, allWevents) {
+		t.Errorf("events got = %+v, want = %+v", events, allWevents)
+	}
+
+	rok, nr := WaitResponse(wStream, 1*time.Second)
+	if !rok {
+		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
+	}
+}
+
 func TestV3WatchMultipleStreamsSynced(t *testing.T) {
 func TestV3WatchMultipleStreamsSynced(t *testing.T) {
 	testV3WatchMultipleStreams(t, 0)
 	testV3WatchMultipleStreams(t, 0)
 }
 }