Browse Source

clientv3: Stop expecting retry in integration tests with new grpc balancer

Joe Betz 7 years ago
parent
commit
1f6548b751

+ 9 - 8
clientv3/integration/black_hole_test.go

@@ -192,22 +192,23 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
 	// blackhole eps[0]
 	clus.Members[0].Blackhole()
 
-	// fail first due to blackhole, retry should succeed
+	// With round robin balancer, client will make a request to a healthy endpoint
+	// within a few requests.
 	// TODO: first operation can succeed
 	// when gRPC supports better retry on non-delivered request
-	for i := 0; i < 2; i++ {
+	for i := 0; i < 5; i++ {
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
 		err = op(cli, ctx)
 		cancel()
 		if err == nil {
 			break
-		}
-		if i == 0 {
-			if err != errExpected {
-				t.Errorf("#%d: expected %v, got %v", i, errExpected, err)
-			}
-		} else if err != nil {
+		} else if err == errExpected {
+			t.Logf("#%d: current error %v", i, err)
+		} else {
 			t.Errorf("#%d: failed with error %v", i, err)
 		}
 	}
+	if err != nil {
+		t.Fatal(err)
+	}
 }

+ 4 - 0
clientv3/integration/dial_test.go

@@ -156,6 +156,10 @@ func TestSwitchSetEndpoints(t *testing.T) {
 	clus.Members[0].InjectPartition(t, clus.Members[1:]...)
 
 	cli.SetEndpoints(eps...)
+
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, cli)
+
 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 	defer cancel()
 	if _, err := cli.Get(ctx, "foo"); err != nil {

+ 15 - 8
clientv3/integration/kv_test.go

@@ -438,12 +438,15 @@ func TestKVGetErrConnClosed(t *testing.T) {
 
 	cli := clus.Client(0)
 
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, cli)
+
 	donec := make(chan struct{})
 	go func() {
 		defer close(donec)
 		_, err := cli.Get(context.TODO(), "foo")
-		if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
-			t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
+		if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
+			t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
 		}
 	}()
 
@@ -686,6 +689,8 @@ func TestKVGetRetry(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
+		// TODO: Remove wait once the new grpc load balancer provides retry.
+		integration.WaitClientV3(t, kv)
 		// Get will fail, but reconnect will trigger
 		gresp, gerr := kv.Get(ctx, "foo")
 		if gerr != nil {
@@ -711,7 +716,7 @@ func TestKVGetRetry(t *testing.T) {
 	clus.Members[fIdx].WaitOK(t)
 
 	select {
-	case <-time.After(5 * time.Second):
+	case <-time.After(20 * time.Second):
 		t.Fatalf("timed out waiting for get")
 	case <-donec:
 	}
@@ -736,6 +741,8 @@ func TestKVPutFailGetRetry(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
+		// TODO: Remove wait once the new grpc load balancer provides retry.
+		integration.WaitClientV3(t, kv)
 		// Get will fail, but reconnect will trigger
 		gresp, gerr := kv.Get(context.TODO(), "foo")
 		if gerr != nil {
@@ -751,7 +758,7 @@ func TestKVPutFailGetRetry(t *testing.T) {
 	clus.Members[0].Restart(t)
 
 	select {
-	case <-time.After(5 * time.Second):
+	case <-time.After(20 * time.Second):
 		t.Fatalf("timed out waiting for get")
 	case <-donec:
 	}
@@ -793,7 +800,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
 	// this Get fails and triggers an asynchronous connection retry
 	_, err := cli.Get(ctx, "abc")
 	cancel()
-	if err != nil && !isServerUnavailable(err) {
+	if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
 		t.Fatal(err)
 	}
 }
@@ -815,15 +822,15 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
 	// grpc finds out the original connection is down due to the member shutdown.
 	_, err := cli.Get(ctx, "abc")
 	cancel()
-	if err != nil && !isServerUnavailable(err) {
+	if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
 		t.Fatal(err)
 	}
 
-	ctx, cancel = context.WithTimeout(context.TODO(), time.Second) // TODO: How was this test not consistently failing with context canceled errors?
+	ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
 	// this Put fails and triggers an asynchronous connection retry
 	_, err = cli.Put(ctx, "abc", "123")
 	cancel()
-	if err != nil && !isServerUnavailable(err) {
+	if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
 		t.Fatal(err)
 	}
 }

+ 6 - 6
clientv3/integration/lease_test.go

@@ -292,10 +292,10 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
 	go func() {
 		defer close(donec)
 		_, err := cli.Grant(context.TODO(), 5)
-		if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
+		if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) {
 			// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
 			// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
-			t.Fatalf("expected %v or %v, got %v", grpc.ErrClientConnClosing, context.Canceled, err)
+			t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
 		}
 	}()
 
@@ -324,8 +324,8 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
-			t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
+		if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
+			t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
 		}
 		close(donec)
 	}()
@@ -356,8 +356,8 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
-			t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
+		if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
+			t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
 		}
 		close(donec)
 	}()

+ 13 - 4
clientv3/integration/leasing_test.go

@@ -869,6 +869,9 @@ func TestLeasingTxnCancel(t *testing.T) {
 	}
 	clus.Members[0].Stop(t)
 
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, clus.Client(1))
+
 	// wait for leader election, if any
 	if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil {
 		t.Fatal(err)
@@ -1533,6 +1536,9 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
 		}
 	}
 
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, lkv)
+
 	lresp, lerr := lkv.Get(context.TODO(), "k")
 	if lerr != nil {
 		t.Fatal(lerr)
@@ -1814,6 +1820,9 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
 	// lkv shouldn't need to call out to server for updated leased keys
 	clus.Members[0].Stop(t)
 
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, clus.Client(1))
+
 	for i := 0; i < n; i++ {
 		k := fmt.Sprintf("tree/%d", i)
 		lkvResp, err := lkv.Get(context.TODO(), k)
@@ -1905,7 +1914,7 @@ func TestLeasingSessionExpire(t *testing.T) {
 	}
 	waitForExpireAck(t, lkv)
 	clus.Members[0].Restart(t)
-
+	integration.WaitClientV3(t, lkv2)
 	if _, err = lkv2.Put(context.TODO(), "abc", "def"); err != nil {
 		t.Fatal(err)
 	}
@@ -1985,8 +1994,8 @@ func TestLeasingSessionExpireCancel(t *testing.T) {
 
 			select {
 			case err := <-errc:
-				if err != ctx.Err() {
-					t.Errorf("#%d: expected %v, got %v", i, ctx.Err(), err)
+				if !(err == ctx.Err() || isServerUnavailable(err)) {
+					t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err)
 				}
 			case <-time.After(5 * time.Second):
 				t.Errorf("#%d: timed out waiting for cancel", i)
@@ -2016,7 +2025,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) {
 		ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
 		_, err := kv.Get(ctx, "abc")
 		cancel()
-		if err == ctx.Err() {
+		if err == ctx.Err() || isServerUnavailable(err) {
 			return
 		} else if err != nil {
 			t.Logf("current error: %v", err)

+ 3 - 1
clientv3/integration/maintenance_test.go

@@ -156,9 +156,11 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
 	b.Close()
 	clus.Members[0].Restart(t)
 
+	cli := clus.RandClient()
+	integration.WaitClientV3(t, cli)
 	// reading snapshot with canceled context should error out
 	ctx, cancel := context.WithCancel(context.Background())
-	rc1, err := clus.RandClient().Snapshot(ctx)
+	rc1, err := cli.Snapshot(ctx)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 17 - 9
clientv3/integration/network_partition_test.go

@@ -73,6 +73,9 @@ func TestBalancerUnderNetworkPartitionTxn(t *testing.T) {
 func TestBalancerUnderNetworkPartitionLinearizableGetWithLongTimeout(t *testing.T) {
 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 		_, err := cli.Get(ctx, "a")
+		if err == rpctypes.ErrTimeout {
+			return errExpected
+		}
 		return err
 	}, 7*time.Second)
 }
@@ -128,7 +131,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
 	time.Sleep(time.Second * 2)
 	clus.Members[0].InjectPartition(t, clus.Members[1:]...)
 
-	for i := 0; i < 2; i++ {
+	for i := 0; i < 5; i++ {
 		ctx, cancel := context.WithTimeout(context.Background(), timeout)
 		err = op(cli, ctx)
 		cancel()
@@ -168,7 +171,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
 
 	cli, err := clientv3.New(clientv3.Config{
 		Endpoints:   []string{eps[(lead+1)%2]},
-		DialTimeout: 1 * time.Second,
+		DialTimeout: 2 * time.Second,
 		DialOptions: []grpc.DialOption{grpc.WithBlock()},
 	})
 	if err != nil {
@@ -176,9 +179,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
 	}
 	defer cli.Close()
 
-	// wait for non-leader to be pinned
-	mustWaitPinReady(t, cli)
-
 	// add all eps to list, so that when the original pined one fails
 	// the client can switch to other available eps
 	cli.SetEndpoints(eps[lead], eps[(lead+1)%2])
@@ -186,10 +186,18 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
 	// isolate leader
 	clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3])
 
-	// expects balancer endpoint switch while ongoing leader election
-	ctx, cancel := context.WithTimeout(context.TODO(), timeout)
-	_, err = cli.Get(ctx, "a")
-	cancel()
+	// TODO: Remove wait once the new grpc load balancer provides retry.
+	integration.WaitClientV3(t, cli)
+
+	// expects balancer to round robin to leader within two attempts
+	for i := 0; i < 2; i++ {
+		ctx, cancel := context.WithTimeout(context.TODO(), timeout)
+		_, err = cli.Get(ctx, "a")
+		cancel()
+		if err == nil {
+			break
+		}
+	}
 	if err != nil {
 		t.Fatal(err)
 	}

+ 11 - 1
clientv3/integration/server_shutdown_test.go

@@ -339,7 +339,17 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
 		defer close(donec)
 		ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
 		readyc <- struct{}{}
-		_, err := cli.Get(ctx, "abc", gops...)
+
+		// TODO: The new grpc load balancer will not pin to an endpoint
+		// as intended by this test. But it will round robin member within
+		// two attempts.
+		// Remove retry loop once the new grpc load balancer provides retry.
+		for i := 0; i < 2; i++ {
+			_, err = cli.Get(ctx, "abc", gops...)
+			if err == nil {
+				break
+			}
+		}
 		cancel()
 		if err != nil {
 			if linearizable && isServerUnavailable(err) {

+ 4 - 1
clientv3/integration/txn_test.go

@@ -79,6 +79,9 @@ func TestTxnWriteFail(t *testing.T) {
 			t.Fatalf("timed out waiting for txn fail")
 		case <-txnc:
 		}
+		// TODO: Remove wait once the new grpc load balancer provides retry.
+		integration.WaitClientV3(t, kv)
+
 		// and ensure the put didn't take
 		gresp, gerr := clus.Client(1).Get(context.TODO(), "foo")
 		if gerr != nil {
@@ -90,7 +93,7 @@ func TestTxnWriteFail(t *testing.T) {
 	}()
 
 	select {
-	case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
+	case <-time.After(5 * clus.Members[1].ServerConfig.ReqTimeout()):
 		t.Fatalf("timed out waiting for get")
 	case <-getc:
 	}

+ 5 - 6
clientv3/options.go

@@ -21,12 +21,11 @@ import (
 )
 
 var (
-	// Disable gRPC internal retrial logic
-	// TODO: enable when gRPC retry is stable (FailFast=false)
-	// Reference:
-	//  - https://github.com/grpc/grpc-go/issues/1532
-	//  - https://github.com/grpc/proposal/blob/master/A6-client-retries.md
-	defaultFailFast = grpc.FailFast(true)
+	// client-side handling retrying of request failures where data was not written to the wire or
+	// where server indicates it did not process the data. gPRC default is default is "FailFast(true)"
+	// but for etcd we default to "FailFast(false)" to minimize client request error responses due to
+	// transident failures.
+	defaultFailFast = grpc.FailFast(false)
 
 	// client-side request send limit, gRPC default is math.MaxInt32
 	// Make sure that "client-side send limit < server-side default send/recv limit"

+ 16 - 0
integration/cluster.go

@@ -974,7 +974,23 @@ func (m *member) WaitStarted(t *testing.T) {
 		cancel()
 		break
 	}
+}
 
+func WaitClientV3(t *testing.T, kv clientv3.KV) {
+	timeout := time.Now().Add(requestTimeout)
+	var err error
+	for time.Now().Before(timeout) {
+		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+		_, err := kv.Get(ctx, "/")
+		cancel()
+		if err == nil {
+			return
+		}
+		time.Sleep(tickDuration)
+	}
+	if err != nil {
+		t.Fatalf("timed out waiting for client: %v", err)
+	}
 }
 
 func (m *member) URL() string { return m.ClientURLs[0].String() }

+ 15 - 2
integration/v3_grpc_test.go

@@ -32,7 +32,9 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
 )
 
 // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
@@ -1934,7 +1936,18 @@ func eqErrGRPC(err1 error, err2 error) bool {
 // FailFast=false works with Put.
 func waitForRestart(t *testing.T, kvc pb.KVClient) {
 	req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
-	if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
-		t.Fatal(err)
+	// TODO: Remove retry loop once the new grpc load balancer provides retry.
+	var err error
+	for i := 0; i < 10; i++ {
+		if _, err = kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
+			if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
+				time.Sleep(time.Millisecond * 250)
+			} else {
+				t.Fatal(err)
+			}
+		}
+	}
+	if err != nil {
+		t.Fatal("timed out waiting for restart: %v", err)
 	}
 }