Browse Source

Merge pull request #6264 from heyitsanthony/error-codes

clientv3: use grpc codes to translate raw grpc errors
Anthony Romano 9 years ago
parent
commit
c388b2f22f

+ 15 - 16
clientv3/client.go

@@ -29,6 +29,7 @@ import (
 
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/metadata"
 )
@@ -294,17 +295,7 @@ func isHaltErr(ctx context.Context, err error) bool {
 	if err == nil {
 		return false
 	}
-	eErr := rpctypes.Error(err)
-	if _, ok := eErr.(rpctypes.EtcdError); ok {
-		return eErr != rpctypes.ErrStopped && eErr != rpctypes.ErrNoLeader
-	}
-	// treat etcdserver errors not recognized by the client as halting
-	return isConnClosing(err) || strings.Contains(err.Error(), "etcdserver:")
-}
-
-// isConnClosing returns true if the error matches a grpc client closing error
-func isConnClosing(err error) bool {
-	return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error())
+	return grpc.Code(err) != codes.Unavailable
 }
 
 func toErr(ctx context.Context, err error) error {
@@ -312,12 +303,20 @@ func toErr(ctx context.Context, err error) error {
 		return nil
 	}
 	err = rpctypes.Error(err)
-	switch {
-	case ctx.Err() != nil && strings.Contains(err.Error(), "context"):
-		err = ctx.Err()
-	case strings.Contains(err.Error(), ErrNoAvailableEndpoints.Error()):
+	if _, ok := err.(rpctypes.EtcdError); ok {
+		return err
+	}
+	code := grpc.Code(err)
+	switch code {
+	case codes.DeadlineExceeded:
+		fallthrough
+	case codes.Canceled:
+		if ctx.Err() != nil {
+			err = ctx.Err()
+		}
+	case codes.Unavailable:
 		err = ErrNoAvailableEndpoints
-	case strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()):
+	case codes.FailedPrecondition:
 		err = grpc.ErrClientConnClosing
 	}
 	return err

+ 5 - 5
clientv3/client_test.go

@@ -19,7 +19,7 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/pkg/testutil"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -72,11 +72,11 @@ func TestIsHaltErr(t *testing.T) {
 	if !isHaltErr(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
 		t.Errorf(`error prefixed with "etcdserver: " should be Halted by default`)
 	}
-	if isHaltErr(nil, etcdserver.ErrStopped) {
-		t.Errorf("error %v should not halt", etcdserver.ErrStopped)
+	if isHaltErr(nil, rpctypes.ErrGRPCStopped) {
+		t.Errorf("error %v should not halt", rpctypes.ErrGRPCStopped)
 	}
-	if isHaltErr(nil, etcdserver.ErrNoLeader) {
-		t.Errorf("error %v should not halt", etcdserver.ErrNoLeader)
+	if isHaltErr(nil, rpctypes.ErrGRPCNoLeader) {
+		t.Errorf("error %v should not halt", rpctypes.ErrGRPCNoLeader)
 	}
 	ctx, cancel := context.WithCancel(context.TODO())
 	if isHaltErr(ctx, nil) {

+ 3 - 3
clientv3/integration/txn_test.go

@@ -73,6 +73,7 @@ func TestTxnWriteFail(t *testing.T) {
 	}()
 
 	go func() {
+		defer close(getc)
 		select {
 		case <-time.After(5 * time.Second):
 			t.Fatalf("timed out waiting for txn fail")
@@ -86,11 +87,10 @@ func TestTxnWriteFail(t *testing.T) {
 		if len(gresp.Kvs) != 0 {
 			t.Fatalf("expected no keys, got %v", gresp.Kvs)
 		}
-		close(getc)
 	}()
 
 	select {
-	case <-time.After(5 * time.Second):
+	case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
 		t.Fatalf("timed out waiting for get")
 	case <-getc:
 	}
@@ -125,7 +125,7 @@ func TestTxnReadRetry(t *testing.T) {
 	clus.Members[0].Restart(t)
 	select {
 	case <-donec:
-	case <-time.After(5 * time.Second):
+	case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
 		t.Fatalf("waited too long")
 	}
 }

+ 12 - 2
clientv3/retry.go

@@ -15,9 +15,11 @@
 package clientv3
 
 import (
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 )
 
 type rpcFunc func(ctx context.Context) error
@@ -27,8 +29,16 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
 	return func(rpcCtx context.Context, f rpcFunc) {
 		for {
 			err := f(rpcCtx)
-			// ignore grpc conn closing on fail-fast calls; they are transient errors
-			if err == nil || !isConnClosing(err) {
+			if err == nil {
+				return
+			}
+			// only retry if unavailable
+			if grpc.Code(err) != codes.Unavailable {
+				return
+			}
+			// always stop retry on etcd errors
+			eErr := rpctypes.Error(err)
+			if _, ok := eErr.(rpctypes.EtcdError); ok {
 				return
 			}
 			select {

+ 15 - 9
etcdserver/api/v3rpc/rpctypes/error.go

@@ -51,9 +51,11 @@ var (
 	ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role")
 	ErrGRPCAuthNotEnabled       = grpc.Errorf(codes.FailedPrecondition, "etcdserver: authentication is not enabled")
 
-	ErrGRPCNoLeader   = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
-	ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
-	ErrGRPCStopped    = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
+	ErrGRPCNoLeader               = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
+	ErrGRPCNotCapable             = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
+	ErrGRPCStopped                = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
+	ErrGRPCTimeout                = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
+	ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure")
 
 	errStringToError = map[string]error{
 		grpc.ErrorDesc(ErrGRPCEmptyKey):     ErrGRPCEmptyKey,
@@ -86,9 +88,11 @@ var (
 		grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
 		grpc.ErrorDesc(ErrGRPCAuthNotEnabled):       ErrGRPCAuthNotEnabled,
 
-		grpc.ErrorDesc(ErrGRPCNoLeader):   ErrGRPCNoLeader,
-		grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
-		grpc.ErrorDesc(ErrGRPCStopped):    ErrGRPCStopped,
+		grpc.ErrorDesc(ErrGRPCNoLeader):               ErrGRPCNoLeader,
+		grpc.ErrorDesc(ErrGRPCNotCapable):             ErrGRPCNotCapable,
+		grpc.ErrorDesc(ErrGRPCStopped):                ErrGRPCStopped,
+		grpc.ErrorDesc(ErrGRPCTimeout):                ErrGRPCTimeout,
+		grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
 	}
 
 	// client-side error
@@ -122,9 +126,11 @@ var (
 	ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
 	ErrAuthNotEnabled       = Error(ErrGRPCAuthNotEnabled)
 
-	ErrNoLeader   = Error(ErrGRPCNoLeader)
-	ErrNotCapable = Error(ErrGRPCNotCapable)
-	ErrStopped    = Error(ErrGRPCStopped)
+	ErrNoLeader               = Error(ErrGRPCNoLeader)
+	ErrNotCapable             = Error(ErrGRPCNotCapable)
+	ErrStopped                = Error(ErrGRPCStopped)
+	ErrTimeout                = Error(ErrGRPCTimeout)
+	ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
 )
 
 // EtcdError defines gRPC server errors.

+ 9 - 1
etcdserver/api/v3rpc/util.go

@@ -32,7 +32,6 @@ func togRPCError(err error) error {
 		return rpctypes.ErrGRPCFutureRev
 	case lease.ErrLeaseNotFound:
 		return rpctypes.ErrGRPCLeaseNotFound
-	// TODO: handle error from raft and timeout
 	case etcdserver.ErrRequestTooLarge:
 		return rpctypes.ErrGRPCRequestTooLarge
 	case etcdserver.ErrNoSpace:
@@ -40,6 +39,15 @@ func togRPCError(err error) error {
 	case etcdserver.ErrTooManyRequests:
 		return rpctypes.ErrTooManyRequests
 
+	case etcdserver.ErrNoLeader:
+		return rpctypes.ErrGRPCNoLeader
+	case etcdserver.ErrStopped:
+		return rpctypes.ErrGRPCStopped
+	case etcdserver.ErrTimeout:
+		return rpctypes.ErrGRPCTimeout
+	case etcdserver.ErrTimeoutDueToLeaderFail:
+		return rpctypes.ErrGRPCTimeoutDueToLeaderFail
+
 	case auth.ErrRootUserNotExist:
 		return rpctypes.ErrGRPCRootUserNotExist
 	case auth.ErrRootRoleNotExist: