|
|
@@ -976,3 +976,37 @@ func TestWatchWithProgressNotify(t *testing.T) {
|
|
|
t.Errorf("unexpected pb.WatchResponse is received %+v", resp)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams.
|
|
|
+func TestV3WatchClose(t *testing.T) {
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
+ clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ c := clus.RandClient()
|
|
|
+ wapi := toGRPC(c).Watch
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(100)
|
|
|
+ for i := 0; i < 100; i++ {
|
|
|
+ go func() {
|
|
|
+ ctx, cancel := context.WithCancel(context.TODO())
|
|
|
+ defer func() {
|
|
|
+ wg.Done()
|
|
|
+ cancel()
|
|
|
+ }()
|
|
|
+ ws, err := wapi.Watch(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ cr := &pb.WatchCreateRequest{Key: []byte("a")}
|
|
|
+ req := &pb.WatchRequest{
|
|
|
+ RequestUnion: &pb.WatchRequest_CreateRequest{
|
|
|
+ CreateRequest: cr}}
|
|
|
+ ws.Send(req)
|
|
|
+ ws.Recv()
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ c.ActiveConnection().Close()
|
|
|
+ wg.Wait()
|
|
|
+}
|