Browse Source

integration, functional: Eliminate direct use of gRPC transport pkg

gRPC has moved the transport package to an internal-only directory. This
eliminates direct use of the transport package in the stress test in
favor of the error code from gRPC that represents a connection problem.

https://godoc.org/google.golang.org/grpc/internal/transport is the new
location for the package, which says it's not intended to be imported
directly. Instead, the maintainers suggested to use the code Unavailable
to detect a connection problem.

This change slightly reorganizes the stresser test error handling.
Matt Brannock 7 years ago
parent
commit
520bd5084e
2 changed files with 63 additions and 42 deletions
  1. 58 39
      functional/tester/stresser_key.go
  2. 5 3
      integration/v3_grpc_inflight_test.go

+ 58 - 39
functional/tester/stresser_key.go

@@ -32,7 +32,8 @@ import (
 	"go.uber.org/zap"
 	"go.uber.org/zap"
 	"golang.org/x/time/rate"
 	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/transport"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 )
 
 
 type keyStresser struct {
 type keyStresser struct {
@@ -128,44 +129,7 @@ func (s *keyStresser) run() {
 			continue
 			continue
 		}
 		}
 
 
-		switch rpctypes.ErrorDesc(err) {
-		case context.DeadlineExceeded.Error():
-			// This retries when request is triggered at the same time as
-			// leader failure. When we terminate the leader, the request to
-			// that leader cannot be processed, and times out. Also requests
-			// to followers cannot be forwarded to the old leader, so timing out
-			// as well. We want to keep stressing until the cluster elects a
-			// new leader and start processing requests again.
-		case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
-			// This retries when request is triggered at the same time as
-			// leader failure and follower nodes receive time out errors
-			// from losing their leader. Followers should retry to connect
-			// to the new leader.
-		case etcdserver.ErrStopped.Error():
-			// one of the etcd nodes stopped from failure injection
-		case transport.ErrConnClosing.Desc:
-			// server closed the transport (failure injected node)
-		case rpctypes.ErrNotCapable.Error():
-			// capability check has not been done (in the beginning)
-		case rpctypes.ErrTooManyRequests.Error():
-			// hitting the recovering member.
-		case raft.ErrProposalDropped.Error():
-			// removed member, or leadership has changed (old leader got raftpb.MsgProp)
-		case context.Canceled.Error():
-			// from stresser.Cancel method:
-			return
-		case grpc.ErrClientConnClosing.Error():
-			// from stresser.Cancel method:
-			return
-		default:
-			s.lg.Warn(
-				"stress run exiting",
-				zap.String("stress-type", "KV"),
-				zap.String("endpoint", s.m.EtcdClientEndpoint),
-				zap.String("error-type", reflect.TypeOf(err).String()),
-				zap.String("error-desc", rpctypes.ErrorDesc(err)),
-				zap.Error(err),
-			)
+		if !s.isRetryableError(err) {
 			return
 			return
 		}
 		}
 
 
@@ -178,6 +142,61 @@ func (s *keyStresser) run() {
 	}
 	}
 }
 }
 
 
+func (s *keyStresser) isRetryableError(err error) bool {
+	switch rpctypes.ErrorDesc(err) {
+	// retryable
+	case context.DeadlineExceeded.Error():
+		// This retries when request is triggered at the same time as
+		// leader failure. When we terminate the leader, the request to
+		// that leader cannot be processed, and times out. Also requests
+		// to followers cannot be forwarded to the old leader, so timing out
+		// as well. We want to keep stressing until the cluster elects a
+		// new leader and start processing requests again.
+		return true
+	case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
+		// This retries when request is triggered at the same time as
+		// leader failure and follower nodes receive time out errors
+		// from losing their leader. Followers should retry to connect
+		// to the new leader.
+		return true
+	case etcdserver.ErrStopped.Error():
+		// one of the etcd nodes stopped from failure injection
+		return true
+	case rpctypes.ErrNotCapable.Error():
+		// capability check has not been done (in the beginning)
+		return true
+	case rpctypes.ErrTooManyRequests.Error():
+		// hitting the recovering member.
+		return true
+	case raft.ErrProposalDropped.Error():
+		// removed member, or leadership has changed (old leader got raftpb.MsgProp)
+		return true
+
+	// not retryable.
+	case context.Canceled.Error():
+		// from stresser.Cancel method:
+		return false
+	case grpc.ErrClientConnClosing.Error():
+		// from stresser.Cancel method:
+		return false
+	}
+
+	if status.Convert(err).Code() == codes.Unavailable {
+		// gRPC connection errors are translated to status.Unavailable
+		return true
+	}
+
+	s.lg.Warn(
+		"stress run exiting",
+		zap.String("stress-type", "KV"),
+		zap.String("endpoint", s.m.EtcdClientEndpoint),
+		zap.String("error-type", reflect.TypeOf(err).String()),
+		zap.String("error-desc", rpctypes.ErrorDesc(err)),
+		zap.Error(err),
+	)
+	return false
+}
+
 func (s *keyStresser) Pause() map[string]int {
 func (s *keyStresser) Pause() map[string]int {
 	return s.Close()
 	return s.Close()
 }
 }

+ 5 - 3
integration/v3_grpc_inflight_test.go

@@ -25,7 +25,8 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/transport"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 )
 
 
 // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
 // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
@@ -82,9 +83,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) {
 			defer wg.Done()
 			defer wg.Done()
 			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
 			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
 			if err != nil {
 			if err != nil {
+				errCode := status.Convert(err).Code()
 				errDesc := rpctypes.ErrorDesc(err)
 				errDesc := rpctypes.ErrorDesc(err)
-				if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) {
-					t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc)
+				if err != nil && !(errDesc == context.Canceled.Error() || errCode == codes.Unavailable) {
+					t.Fatalf("inflight request should be canceled with '%v' or code Unavailable, got '%v' with code '%s'", context.Canceled.Error(), errDesc, errCode)
 				}
 				}
 			}
 			}
 		}()
 		}()