|
@@ -19,6 +19,7 @@ import (
|
|
|
"math/rand"
|
|
"math/rand"
|
|
|
"reflect"
|
|
"reflect"
|
|
|
"sort"
|
|
"sort"
|
|
|
|
|
+ "sync"
|
|
|
"testing"
|
|
"testing"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
@@ -569,9 +570,9 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
|
|
|
clus := newClusterGRPC(t, &clusterConfig{size: 3})
|
|
clus := newClusterGRPC(t, &clusterConfig{size: 3})
|
|
|
|
|
|
|
|
wAPI := pb.NewWatchClient(clus.RandConn())
|
|
wAPI := pb.NewWatchClient(clus.RandConn())
|
|
|
- wStream, err := wAPI.Watch(context.TODO())
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatalf("wAPI.Watch error: %v", err)
|
|
|
|
|
|
|
+ wStream, wErr := wAPI.Watch(context.TODO())
|
|
|
|
|
+ if wErr != nil {
|
|
|
|
|
+ t.Fatalf("wAPI.Watch error: %v", wErr)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil {
|
|
if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil {
|
|
@@ -641,6 +642,72 @@ func (evs eventsSortByKey) Len() int { return len(evs) }
|
|
|
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
|
|
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
|
|
|
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
|
|
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
|
|
|
|
|
|
|
|
|
|
+// TestV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
|
|
|
|
|
+func TestV3WatchMultipleStreams(t *testing.T) {
|
|
|
|
|
+ clus := newClusterGRPC(t, &clusterConfig{size: 3})
|
|
|
|
|
+ wAPI := pb.NewWatchClient(clus.RandConn())
|
|
|
|
|
+ kvc := pb.NewKVClient(clus.RandConn())
|
|
|
|
|
+
|
|
|
|
|
+ streams := make([]pb.Watch_WatchClient, 5)
|
|
|
|
|
+ for i := range streams {
|
|
|
|
|
+ wStream, errW := wAPI.Watch(context.TODO())
|
|
|
|
|
+ if errW != nil {
|
|
|
|
|
+ t.Fatalf("wAPI.Watch error: %v", errW)
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}); err != nil {
|
|
|
|
|
+ t.Fatalf("wStream.Send error: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ streams[i] = wStream
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for _, wStream := range streams {
|
|
|
|
|
+ wresp, err := wStream.Recv()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatalf("wStream.Recv error: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ if !wresp.Created {
|
|
|
|
|
+ t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
|
|
|
|
|
+ t.Fatalf("couldn't put key (%v)", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
|
+ wg.Add(len(streams))
|
|
|
|
|
+ wevents := []*storagepb.Event{
|
|
|
|
|
+ {
|
|
|
|
|
+ Type: storagepb.PUT,
|
|
|
|
|
+ Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
|
|
|
|
|
+ },
|
|
|
|
|
+ }
|
|
|
|
|
+ for i := range streams {
|
|
|
|
|
+ go func(i int) {
|
|
|
|
|
+ defer wg.Done()
|
|
|
|
|
+ wStream := streams[i]
|
|
|
|
|
+ wresp, err := wStream.Recv()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatalf("wStream.Recv error: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ if wresp.WatchId != 0 {
|
|
|
|
|
+ t.Errorf("watchId got = %d, want = 0", wresp.WatchId)
|
|
|
|
|
+ }
|
|
|
|
|
+ if !reflect.DeepEqual(wresp.Events, wevents) {
|
|
|
|
|
+ t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents)
|
|
|
|
|
+ }
|
|
|
|
|
+ // now Recv should block because there is no more events coming
|
|
|
|
|
+ rok, nr := WaitResponse(wStream, 1*time.Second)
|
|
|
|
|
+ if !rok {
|
|
|
|
|
+ t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
|
|
|
|
|
+ }
|
|
|
|
|
+ }(i)
|
|
|
|
|
+ }
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
+
|
|
|
|
|
+ clus.Terminate(t)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// WaitResponse waits on the given stream for given duration.
|
|
// WaitResponse waits on the given stream for given duration.
|
|
|
// If there is no more events, true and a nil response will be
|
|
// If there is no more events, true and a nil response will be
|
|
|
// returned closing the WatchClient stream. Or the response will
|
|
// returned closing the WatchClient stream. Or the response will
|