Selaa lähdekoodia

*: deprecate grpc.Code, grpc.ErrorDesc

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 vuotta sitten
vanhempi
commit
6f76d52a1a

+ 5 - 3
clientv3/client.go

@@ -33,6 +33,7 @@ import (
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
 )
 
 var (
@@ -478,14 +479,14 @@ func isHaltErr(ctx context.Context, err error) bool {
 	if err == nil {
 		return false
 	}
-	code := grpc.Code(err)
+	ev, _ := status.FromError(err)
 	// Unavailable codes mean the system will be right back.
 	// (e.g., can't connect, lost leader)
 	// Treat Internal codes as if something failed, leaving the
 	// system in an inconsistent state, but retrying could make progress.
 	// (e.g., failed in middle of send, corrupted frame)
 	// TODO: are permanent Internal errors possible from grpc?
-	return code != codes.Unavailable && code != codes.Internal
+	return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
 }
 
 func toErr(ctx context.Context, err error) error {
@@ -496,7 +497,8 @@ func toErr(ctx context.Context, err error) error {
 	if _, ok := err.(rpctypes.EtcdError); ok {
 		return err
 	}
-	code := grpc.Code(err)
+	ev, _ := status.FromError(err)
+	code := ev.Code()
 	switch code {
 	case codes.DeadlineExceeded:
 		fallthrough

+ 5 - 2
clientv3/leasing/kv.go

@@ -26,8 +26,8 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 
-	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 type leasingKV struct {
@@ -282,7 +282,10 @@ func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.Tx
 			return resp, nil
 		}
 		// retry if transient error
-		if _, ok := err.(rpctypes.EtcdError); ok || grpc.Code(err) != codes.Unavailable {
+		if _, ok := err.(rpctypes.EtcdError); ok {
+			return nil, err
+		}
+		if ev, _ := status.FromError(err); ev.Code() != codes.Unavailable {
 			return nil, err
 		}
 	}

+ 8 - 3
clientv3/retry.go

@@ -22,6 +22,7 @@ import (
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 type rpcFunc func(ctx context.Context) error
@@ -35,12 +36,16 @@ func isReadStopError(err error) bool {
 		return true
 	}
 	// only retry if unavailable
-	return grpc.Code(err) != codes.Unavailable
+	ev, _ := status.FromError(err)
+	return ev.Code() != codes.Unavailable
 }
 
 func isWriteStopError(err error) bool {
-	return grpc.Code(err) != codes.Unavailable ||
-		grpc.ErrorDesc(err) != "there is no address available"
+	ev, _ := status.FromError(err)
+	if ev.Code() != codes.Unavailable {
+		return true
+	}
+	return rpctypes.ErrorDesc(err) != "there is no address available"
 }
 
 func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {

+ 5 - 2
integration/v3_grpc_inflight_test.go

@@ -20,6 +20,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 
@@ -79,8 +80,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) {
 		go func() {
 			defer wg.Done()
 			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
-			if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() {
-				t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err)
+			if err != nil {
+				if err != nil && rpctypes.ErrorDesc(err) != context.Canceled.Error() {
+					t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err)
+				}
 			}
 		}()
 	}

+ 3 - 3
integration/v3_grpc_test.go

@@ -1778,7 +1778,7 @@ func TestGRPCRequireLeader(t *testing.T) {
 	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
 	ctx := metadata.NewOutgoingContext(context.Background(), md)
 	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
-	if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
+	if _, err := toGRPC(client).KV.Put(ctx, reqput); rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
 		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
 	}
 }
@@ -1809,7 +1809,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
 
 	// existing stream should be rejected
 	_, err = wStream.Recv()
-	if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
+	if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
 		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
 	}
 
@@ -1819,7 +1819,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
 		t.Fatalf("wAPI.Watch error: %v", err)
 	}
 	_, err = wStream.Recv()
-	if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
+	if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
 		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
 	}
 

+ 1 - 2
integration/v3_lease_test.go

@@ -25,7 +25,6 @@ import (
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/pkg/testutil"
 
-	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 )
 
@@ -560,7 +559,7 @@ func TestV3LeaseRequireLeader(t *testing.T) {
 		if err == nil {
 			t.Fatalf("got response %+v, expected error", resp)
 		}
-		if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
+		if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
 			t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
 		}
 	}()

+ 2 - 2
proxy/grpcproxy/cluster.go

@@ -22,10 +22,10 @@ import (
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/naming"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 	"golang.org/x/time/rate"
-	"google.golang.org/grpc"
 	gnaming "google.golang.org/grpc/naming"
 )
 
@@ -89,7 +89,7 @@ func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
 		ups, err := wa.Next()
 		if err != nil {
 			plog.Warningf("clusterProxy watcher error (%v)", err)
-			if grpc.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
+			if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
 				return
 			}
 		}

+ 2 - 1
proxy/grpcproxy/leader.go

@@ -20,6 +20,7 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 
 	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
@@ -68,7 +69,7 @@ func (l *leader) recvLoop() {
 		}
 		if cresp.Err() != nil {
 			l.loseLeader()
-			if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
+			if rpctypes.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
 				close(l.disconnc)
 				return
 			}

+ 3 - 2
tools/functional-tester/etcd-runner/command/lease_renewer_command.go

@@ -24,8 +24,8 @@ import (
 	"github.com/coreos/etcd/clientv3"
 
 	"github.com/spf13/cobra"
-	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 var (
@@ -68,7 +68,8 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
 
 		for {
 			lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
-			if grpc.Code(err) == codes.NotFound {
+			ev, _ := status.FromError(err)
+			if ev.Code() == codes.NotFound {
 				if time.Since(expire) < 0 {
 					log.Fatalf("bad renew! exceeded: %v", time.Since(expire))
 					for {

+ 1 - 1
tools/functional-tester/etcd-tester/key_stresser.go

@@ -106,7 +106,7 @@ func (s *keyStresser) run(ctx context.Context) {
 			continue
 		}
 
-		switch grpc.ErrorDesc(err) {
+		switch rpctypes.ErrorDesc(err) {
 		case context.DeadlineExceeded.Error():
 			// This retries when request is triggered at the same time as
 			// leader failure. When we terminate the leader, the request to