|
@@ -961,3 +961,22 @@ func TestWatchStressResumeClose(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
clus.TakeClient(0)
|
|
clus.TakeClient(0)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+// TestWatchCancelDisconnected ensures canceling a watcher works when
|
|
|
|
|
+// its grpc stream is disconnected / reconnecting.
|
|
|
|
|
+func TestWatchCancelDisconnected(t *testing.T) {
|
|
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
|
|
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
|
|
|
|
+ defer clus.Terminate(t)
|
|
|
|
|
+ cli := clus.Client(0)
|
|
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
+ // add more watches than can be resumed before the cancel
|
|
|
|
|
+ wch := cli.Watch(ctx, "abc")
|
|
|
|
|
+ clus.Members[0].Stop(t)
|
|
|
|
|
+ cancel()
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-wch:
|
|
|
|
|
+ case <-time.After(time.Second):
|
|
|
|
|
+ t.Fatal("took too long to cancel disconnected watcher")
|
|
|
|
|
+ }
|
|
|
|
|
+}
|