Browse Source

Merge pull request #8772 from gyuho/shutdown

clientv3/integration: add TestBalancerUnderServerShutdownMutable*
Gyu-Ho Lee 8 years ago
parent
commit
0160cd76e5
1 changed files with 68 additions and 0 deletions
  1. 68 0
      clientv3/integration/server_shutdown_test.go

+ 68 - 0
clientv3/integration/server_shutdown_test.go

@@ -111,3 +111,71 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
 		t.Fatal("took too long to receive events")
 	}
 }
+
+func TestBalancerUnderServerShutdownPut(t *testing.T) {
+	testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
+		_, err := cli.Put(ctx, "foo", "bar")
+		return err
+	})
+}
+
+func TestBalancerUnderServerShutdownDelete(t *testing.T) {
+	testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
+		_, err := cli.Delete(ctx, "foo")
+		return err
+	})
+}
+
+func TestBalancerUnderServerShutdownTxn(t *testing.T) {
+	testBalancerUnderServerShutdownMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
+		_, err := cli.Txn(ctx).
+			If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
+			Then(clientv3.OpPut("foo", "bar")).
+			Else(clientv3.OpPut("foo", "baz")).Commit()
+		return err
+	})
+}
+
+// testBalancerUnderServerShutdownMutable expects that when the member of
+// the pinned endpoint is shut down, the balancer switches its endpoints
+// and all subsequent put/delete/txn requests succeed with new endpoints.
+func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Client, context.Context) error) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
+		Size:               3,
+		SkipCreatingClient: true,
+	})
+	defer clus.Terminate(t)
+
+	eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
+
+	// pin eps[0]
+	cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer cli.Close()
+
+	// wait for eps[0] to be pinned
+	waitPinReady(t, cli)
+
+	// add all eps to list, so that when the original pined one fails
+	// the client can switch to other available eps
+	cli.SetEndpoints(eps...)
+
+	// shut down eps[0]
+	clus.Members[0].Terminate(t)
+
+	// switched to others when eps[0] was explicitly shut down
+	// and following request should succeed
+	// TODO: remove this (expose client connection state?)
+	time.Sleep(time.Second)
+
+	cctx, ccancel := context.WithTimeout(context.Background(), time.Second)
+	err = op(cli, cctx)
+	ccancel()
+	if err != nil {
+		t.Fatal(err)
+	}
+}