Procházet zdrojové kódy

Merge pull request #4237 from gyuho/multi_stream

integration: add TestV3WatchMultipleStreams
Gyu-Ho Lee před 10 roky
rodič
revize
72195afbc9
1 změnil soubory, kde provedl 70 přidání a 3 odebrání
  1. 70 3
      integration/v3_grpc_test.go

+ 70 - 3
integration/v3_grpc_test.go

@@ -19,6 +19,7 @@ import (
 	"math/rand"
 	"reflect"
 	"sort"
+	"sync"
 	"testing"
 	"time"
 
@@ -569,9 +570,9 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
 	clus := newClusterGRPC(t, &clusterConfig{size: 3})
 
 	wAPI := pb.NewWatchClient(clus.RandConn())
-	wStream, err := wAPI.Watch(context.TODO())
-	if err != nil {
-		t.Fatalf("wAPI.Watch error: %v", err)
+	wStream, wErr := wAPI.Watch(context.TODO())
+	if wErr != nil {
+		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
 
 	if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil {
@@ -641,6 +642,72 @@ func (evs eventsSortByKey) Len() int           { return len(evs) }
 func (evs eventsSortByKey) Swap(i, j int)      { evs[i], evs[j] = evs[j], evs[i] }
 func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
 
+// TestV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
+func TestV3WatchMultipleStreams(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	wAPI := pb.NewWatchClient(clus.RandConn())
+	kvc := pb.NewKVClient(clus.RandConn())
+
+	streams := make([]pb.Watch_WatchClient, 5)
+	for i := range streams {
+		wStream, errW := wAPI.Watch(context.TODO())
+		if errW != nil {
+			t.Fatalf("wAPI.Watch error: %v", errW)
+		}
+		if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}); err != nil {
+			t.Fatalf("wStream.Send error: %v", err)
+		}
+		streams[i] = wStream
+	}
+
+	for _, wStream := range streams {
+		wresp, err := wStream.Recv()
+		if err != nil {
+			t.Fatalf("wStream.Recv error: %v", err)
+		}
+		if !wresp.Created {
+			t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
+		}
+	}
+
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(len(streams))
+	wevents := []*storagepb.Event{
+		{
+			Type: storagepb.PUT,
+			Kv:   &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
+		},
+	}
+	for i := range streams {
+		go func(i int) {
+			defer wg.Done()
+			wStream := streams[i]
+			wresp, err := wStream.Recv()
+			if err != nil {
+				t.Fatalf("wStream.Recv error: %v", err)
+			}
+			if wresp.WatchId != 0 {
+				t.Errorf("watchId got = %d, want = 0", wresp.WatchId)
+			}
+			if !reflect.DeepEqual(wresp.Events, wevents) {
+				t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents)
+			}
+			// now Recv should block because there is no more events coming
+			rok, nr := WaitResponse(wStream, 1*time.Second)
+			if !rok {
+				t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
+			}
+		}(i)
+	}
+	wg.Wait()
+
+	clus.Terminate(t)
+}
+
 // WaitResponse waits on the given stream for given duration.
 // If there is no more events, true and a nil response will be
 // returned closing the WatchClient stream. Or the response will