瀏覽代碼

clientv3: use retryConnection

Xiang Li 10 年之前
父節點
當前提交
a3b7876a3c
共有 4 個文件被更改,包括 46 次插入22 次删除
  1. 9 3
      clientv3/client.go
  2. 33 15
      clientv3/kv.go
  3. 1 1
      etcdserver/api/v3rpc/key.go
  4. 3 3
      etcdserver/api/v3rpc/member.go

+ 9 - 3
clientv3/client.go

@@ -18,6 +18,7 @@ import (
 	"sync"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
@@ -120,7 +121,7 @@ func (c *Client) activeConnection() *grpc.ClientConn {
 }
 
 // refreshConnection establishes a new connection
-func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.ClientConn {
+func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	if err != nil {
@@ -128,14 +129,15 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.Clie
 	}
 	if oldConn != c.conn {
 		// conn has already been updated
-		return c.conn
+		return c.conn, nil
 	}
 	conn, dialErr := c.cfg.RetryDialer(c)
 	if dialErr != nil {
 		c.errors = append(c.errors, dialErr)
+		return nil, dialErr
 	}
 	c.conn = conn
-	return c.conn
+	return c.conn, nil
 }
 
 // dialEndpoints attempts to connect to each endpoint in order until a
@@ -152,3 +154,7 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
 	}
 	return nil, err
 }
+
+func isRPCError(err error) bool {
+	return grpc.Code(err) != codes.Unknown
+}

+ 33 - 15
clientv3/kv.go

@@ -16,6 +16,7 @@ package clientv3
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
@@ -87,30 +88,47 @@ type Txn interface {
 }
 
 type kv struct {
+	conn   *grpc.ClientConn // conn in-use
 	remote pb.KVClient
 
 	c *Client
 }
 
 func (kv *kv) Range(key, end string, limit, rev int64, sort *SortOption) (*pb.RangeResponse, error) {
-	r := kv.do(OpRange(key, end, limit, rev, sort))
+	r, err := kv.do(OpRange(key, end, limit, rev, sort))
+	if err != nil {
+		return nil, err
+	}
 	return r.GetResponseRange(), nil
 }
 
-func (kv *kv) do(op Op) *pb.ResponseUnion {
-	switch op.t {
-	// TODO: handle other ops
-	case tRange:
-		// TODO: setup sorting
-		r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev}
-		resp, err := kv.remote.Range(context.TODO(), r)
-		if err != nil {
-			// do something
+func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
+	for {
+		var err error
+		switch op.t {
+		// TODO: handle other ops
+		case tRange:
+			var resp *pb.RangeResponse
+			// TODO: setup sorting
+			r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev}
+			resp, err = kv.remote.Range(context.TODO(), r)
+			if err == nil {
+				return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{resp}}, nil
+			}
+		default:
+			panic("Unknown op")
+		}
+
+		if isRPCError(err) {
+			return nil, err
 		}
-		return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{resp}}
-	default:
-		panic("Unknown op")
-	}
 
-	return nil
+		newConn, cerr := kv.c.retryConnection(kv.conn, err)
+		if cerr != nil {
+			// TODO: return client lib defined connection error
+			return nil, cerr
+		}
+		kv.conn = newConn
+		kv.remote = pb.NewKVClient(kv.conn)
+	}
 }

+ 1 - 1
etcdserver/api/v3rpc/key.go

@@ -215,6 +215,6 @@ func togRPCError(err error) error {
 		return ErrFutureRev
 	// TODO: handle error from raft and timeout
 	default:
-		return grpc.Errorf(codes.Unknown, err.Error())
+		return grpc.Errorf(codes.Internal, err.Error())
 	}
 }

+ 3 - 3
etcdserver/api/v3rpc/member.go

@@ -54,7 +54,7 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
 	case err == etcdserver.ErrPeerURLexists:
 		return nil, ErrPeerURLExist
 	case err != nil:
-		return nil, grpc.Errorf(codes.Unknown, err.Error())
+		return nil, grpc.Errorf(codes.Internal, err.Error())
 	}
 
 	return &pb.MemberAddResponse{
@@ -71,7 +71,7 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq
 	case err == etcdserver.ErrIDNotFound:
 		return nil, ErrMemberNotFound
 	case err != nil:
-		return nil, grpc.Errorf(codes.Unknown, err.Error())
+		return nil, grpc.Errorf(codes.Internal, err.Error())
 	}
 
 	return &pb.MemberRemoveResponse{Header: cs.header()}, nil
@@ -89,7 +89,7 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq
 	case err == etcdserver.ErrIDNotFound:
 		return nil, ErrMemberNotFound
 	case err != nil:
-		return nil, grpc.Errorf(codes.Unknown, err.Error())
+		return nil, grpc.Errorf(codes.Internal, err.Error())
 	}
 
 	return &pb.MemberUpdateResponse{Header: cs.header()}, nil