Browse Source

Merge pull request #4235 from gyuho/watch_multi_synced

integration: watch test for multi-events with txn
Gyu-Ho Lee 10 years ago
parent
commit
d2e35f68f9
1 changed files with 81 additions and 0 deletions
  1. 81 0
      integration/v3_grpc_test.go

+ 81 - 0
integration/v3_grpc_test.go

@@ -14,8 +14,11 @@
 package integration
 package integration
 
 
 import (
 import (
+	"bytes"
+	"fmt"
 	"math/rand"
 	"math/rand"
 	"reflect"
 	"reflect"
+	"sort"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -560,6 +563,84 @@ func TestV3WatchMultiple(t *testing.T) {
 	clus.Terminate(t)
 	clus.Terminate(t)
 }
 }
 
 
+// TestV3WatchMultipleEventsFromCurrentRevision tests Watch APIs from current revision
+// in cases it receives multiple events.
+func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+
+	wAPI := pb.NewWatchClient(clus.RandConn())
+	wStream, err := wAPI.Watch(context.TODO())
+	if err != nil {
+		t.Fatalf("wAPI.Watch error: %v", err)
+	}
+
+	if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil {
+		t.Fatalf("wStream.Send error: %v", err)
+	}
+
+	kvc := pb.NewKVClient(clus.RandConn())
+	txn := pb.TxnRequest{}
+	for i := 0; i < 3; i++ {
+		ru := &pb.RequestUnion{}
+		ru.RequestPut = &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}
+		txn.Success = append(txn.Success, ru)
+	}
+
+	tresp, err := kvc.Txn(context.Background(), &txn)
+	if err != nil {
+		t.Fatalf("kvc.Txn error: %v", err)
+	}
+	if !tresp.Succeeded {
+		t.Fatalf("kvc.Txn failed: %+v", tresp)
+	}
+
+	events := []*storagepb.Event{}
+	for len(events) < 3 {
+		resp, err := wStream.Recv()
+		if err != nil {
+			t.Errorf("wStream.Recv error: %v", err)
+		}
+		if resp.Created {
+			continue
+		}
+		events = append(events, resp.Events...)
+	}
+	sort.Sort(eventsSortByKey(events))
+
+	wevents := []*storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+	}
+
+	if !reflect.DeepEqual(events, wevents) {
+		t.Errorf("events got = %+v, want = %+v", events, wevents)
+	}
+
+	rok, nr := WaitResponse(wStream, 1*time.Second)
+	if !rok {
+		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
+	}
+
+	// can't defer because tcp ports will be in use
+	clus.Terminate(t)
+}
+
+type eventsSortByKey []*storagepb.Event
+
+func (evs eventsSortByKey) Len() int           { return len(evs) }
+func (evs eventsSortByKey) Swap(i, j int)      { evs[i], evs[j] = evs[j], evs[i] }
+func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
+
 // WaitResponse waits on the given stream for given duration.
 // WaitResponse waits on the given stream for given duration.
 // If there is no more events, true and a nil response will be
 // If there is no more events, true and a nil response will be
 // returned closing the WatchClient stream. Or the response will
 // returned closing the WatchClient stream. Or the response will