|
|
@@ -621,3 +621,55 @@ func TestWatchAfterClose(t *testing.T) {
|
|
|
case <-donec:
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// TestWatchWithRequireLeader checks the watch channel closes when no leader.
|
|
|
+func TestWatchWithRequireLeader(t *testing.T) {
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
+
|
|
|
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ // something for the non-require leader watch to read as an event
|
|
|
+ if _, err := clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ clus.Members[1].Stop(t)
|
|
|
+ clus.Members[2].Stop(t)
|
|
|
+ clus.Client(1).Close()
|
|
|
+ clus.Client(2).Close()
|
|
|
+ clus.TakeClient(1)
|
|
|
+ clus.TakeClient(2)
|
|
|
+
|
|
|
+ // wait for election timeout, then member[0] will not have a leader.
|
|
|
+ tickDuration := 10 * time.Millisecond
|
|
|
+ time.Sleep(time.Duration(3*clus.Members[0].ElectionTicks) * tickDuration)
|
|
|
+
|
|
|
+ chLeader := clus.Client(0).Watch(clientv3.WithRequireLeader(context.TODO()), "foo", clientv3.WithRev(1))
|
|
|
+ chNoLeader := clus.Client(0).Watch(context.TODO(), "foo", clientv3.WithRev(1))
|
|
|
+
|
|
|
+ select {
|
|
|
+ case resp, ok := <-chLeader:
|
|
|
+ if !ok {
|
|
|
+ t.Fatalf("expected %v watch channel, got closed channel", rpctypes.ErrNoLeader)
|
|
|
+ }
|
|
|
+ if resp.Err() != rpctypes.ErrNoLeader {
|
|
|
+ t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
|
|
|
+ }
|
|
|
+ case <-time.After(3 * time.Second):
|
|
|
+ t.Fatal("watch without leader took too long to close")
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case resp, ok := <-chLeader:
|
|
|
+ if ok {
|
|
|
+ t.Fatalf("expected closed channel, got response %v", resp)
|
|
|
+ }
|
|
|
+ case <-time.After(3 * time.Second):
|
|
|
+ t.Fatal("waited too long for channel to close")
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, ok := <-chNoLeader; !ok {
|
|
|
+ t.Fatalf("expected response, got closed channel")
|
|
|
+ }
|
|
|
+}
|