Browse Source

Merge pull request #6288 from heyitsanthony/fix-retryread

clientv3: retry non-mutable rpcs on Internal codes
Anthony Romano 9 years ago
parent
commit
e53b99588a
4 changed files with 32 additions and 31 deletions
  1. 8 1
      clientv3/client.go
  2. 7 1
      clientv3/txn.go
  3. 6 28
      etcdserver/api/v3rpc/member.go
  4. 11 1
      etcdserver/api/v3rpc/util.go

+ 8 - 1
clientv3/client.go

@@ -295,7 +295,14 @@ func isHaltErr(ctx context.Context, err error) bool {
 	if err == nil {
 		return false
 	}
-	return grpc.Code(err) != codes.Unavailable
+	code := grpc.Code(err)
+	// Unavailable codes mean the system will be right back.
+	// (e.g., can't connect, lost leader)
+	// Treat Internal codes as if something failed, leaving the
+	// system in an inconsistent state, but retrying could make progress.
+	// (e.g., failed in middle of send, corrupted frame)
+	// TODO: are permanent Internal errors possible from grpc?
+	return code != codes.Unavailable && code != codes.Internal
 }
 
 func toErr(ctx context.Context, err error) error {

+ 7 - 1
clientv3/txn.go

@@ -19,6 +19,7 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
 )
 
 // Txn is the interface that wraps mini-transactions.
@@ -152,7 +153,12 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 
 func (txn *txn) commit() (*TxnResponse, error) {
 	r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
-	resp, err := txn.kv.remote.Txn(txn.ctx, r)
+
+	var opts []grpc.CallOption
+	if !txn.isWrite {
+		opts = []grpc.CallOption{grpc.FailFast(false)}
+	}
+	resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...)
 	if err != nil {
 		return nil, err
 	}

+ 6 - 28
etcdserver/api/v3rpc/member.go

@@ -24,8 +24,6 @@ import (
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/pkg/types"
 	"golang.org/x/net/context"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
 )
 
 type ClusterServer struct {
@@ -50,14 +48,8 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
 
 	now := time.Now()
 	m := membership.NewMember("", urls, "", &now)
-	err = cs.server.AddMember(ctx, *m)
-	switch {
-	case err == membership.ErrIDExists:
-		return nil, rpctypes.ErrGRPCMemberExist
-	case err == membership.ErrPeerURLexists:
-		return nil, rpctypes.ErrGRPCPeerURLExist
-	case err != nil:
-		return nil, grpc.Errorf(codes.Internal, err.Error())
+	if err = cs.server.AddMember(ctx, *m); err != nil {
+		return nil, togRPCError(err)
 	}
 
 	return &pb.MemberAddResponse{
@@ -67,16 +59,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
 }
 
 func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
-	err := cs.server.RemoveMember(ctx, r.ID)
-	switch {
-	case err == membership.ErrIDRemoved:
-		fallthrough
-	case err == membership.ErrIDNotFound:
-		return nil, rpctypes.ErrGRPCMemberNotFound
-	case err != nil:
-		return nil, grpc.Errorf(codes.Internal, err.Error())
+	if err := cs.server.RemoveMember(ctx, r.ID); err != nil {
+		return nil, togRPCError(err)
 	}
-
 	return &pb.MemberRemoveResponse{Header: cs.header()}, nil
 }
 
@@ -85,16 +70,9 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq
 		ID:             types.ID(r.ID),
 		RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs},
 	}
-	err := cs.server.UpdateMember(ctx, m)
-	switch {
-	case err == membership.ErrPeerURLexists:
-		return nil, rpctypes.ErrGRPCPeerURLExist
-	case err == membership.ErrIDNotFound:
-		return nil, rpctypes.ErrGRPCMemberNotFound
-	case err != nil:
-		return nil, grpc.Errorf(codes.Internal, err.Error())
+	if err := cs.server.UpdateMember(ctx, m); err != nil {
+		return nil, togRPCError(err)
 	}
-
 	return &pb.MemberUpdateResponse{Header: cs.header()}, nil
 }
 

+ 11 - 1
etcdserver/api/v3rpc/util.go

@@ -18,6 +18,7 @@ import (
 	"github.com/coreos/etcd/auth"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc"
 	"google.golang.org/grpc"
@@ -26,6 +27,15 @@ import (
 
 func togRPCError(err error) error {
 	switch err {
+	case membership.ErrIDRemoved:
+		return rpctypes.ErrGRPCMemberNotFound
+	case membership.ErrIDNotFound:
+		return rpctypes.ErrGRPCMemberNotFound
+	case membership.ErrIDExists:
+		return rpctypes.ErrGRPCMemberExist
+	case membership.ErrPeerURLexists:
+		return rpctypes.ErrGRPCPeerURLExist
+
 	case mvcc.ErrCompacted:
 		return rpctypes.ErrGRPCCompacted
 	case mvcc.ErrFutureRev:
@@ -71,6 +81,6 @@ func togRPCError(err error) error {
 	case auth.ErrAuthNotEnabled:
 		return rpctypes.ErrGRPCAuthNotEnabled
 	default:
-		return grpc.Errorf(codes.Internal, err.Error())
+		return grpc.Errorf(codes.Unknown, err.Error())
 	}
 }