|
@@ -99,3 +99,69 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
|
|
|
t.Errorf("balancer did not switch in time (%v)", err)
|
|
t.Errorf("balancer did not switch in time (%v)", err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+func TestBalancerUnderNetworkPartitionWatchLeader(t *testing.T) {
|
|
|
|
|
+ testBalancerUnderNetworkPartitionWatch(t, true)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func TestBalancerUnderNetworkPartitionWatchFollower(t *testing.T) {
|
|
|
|
|
+ testBalancerUnderNetworkPartitionWatch(t, false)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// testBalancerUnderNetworkPartitionWatch ensures watch stream
|
|
|
|
|
+// to a partitioned node be closed when context requires leader.
|
|
|
|
|
+func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
|
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
|
|
+
|
|
|
|
|
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{
|
|
|
|
|
+ Size: 3,
|
|
|
|
|
+ SkipCreatingClient: true,
|
|
|
|
|
+ })
|
|
|
|
|
+ defer clus.Terminate(t)
|
|
|
|
|
+
|
|
|
|
|
+ eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
|
|
|
|
|
+
|
|
|
|
|
+ target := clus.WaitLeader(t)
|
|
|
|
|
+ if !isolateLeader {
|
|
|
|
|
+ target = (target + 1) % 3
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // pin eps[target]
|
|
|
|
|
+ watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ defer watchCli.Close()
|
|
|
|
|
+
|
|
|
|
|
+ // wait for eps[target] to be pinned
|
|
|
|
|
+ waitPinReady(t, watchCli)
|
|
|
|
|
+
|
|
|
|
|
+ // add all eps to list, so that when the original pined one fails
|
|
|
|
|
+ // the client can switch to other available eps
|
|
|
|
|
+ watchCli.SetEndpoints(eps...)
|
|
|
|
|
+
|
|
|
|
|
+ wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-wch:
|
|
|
|
|
+ case <-time.After(3 * time.Second):
|
|
|
|
|
+ t.Fatal("took too long to create watch")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // isolate eps[target]
|
|
|
|
|
+ clus.Members[target].InjectPartition(t,
|
|
|
|
|
+ clus.Members[(target+1)%3],
|
|
|
|
|
+ clus.Members[(target+2)%3],
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case ev := <-wch:
|
|
|
|
|
+ if len(ev.Events) != 0 {
|
|
|
|
|
+ t.Fatal("expected no event")
|
|
|
|
|
+ }
|
|
|
|
|
+ if err = ev.Err(); err != rpctypes.ErrNoLeader {
|
|
|
|
|
+ t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ case <-time.After(3 * time.Second): // enough time to detect leader lost
|
|
|
|
|
+ t.Fatal("took too long to detect leader lost")
|
|
|
|
|
+ }
|
|
|
|
|
+}
|