Browse Source

clientv3: convert errors to rpctypes on returning

For https://github.com/coreos/etcd/issues/5211.
Gyu-Ho Lee 9 years ago
parent
commit
2e3d79a7bf
8 changed files with 64 additions and 59 deletions
  1. 2 2
      clientv3/client.go
  2. 4 4
      clientv3/client_test.go
  3. 13 12
      clientv3/cluster.go
  4. 11 10
      clientv3/kv.go
  5. 13 12
      clientv3/lease.go
  6. 13 12
      clientv3/maintenance.go
  7. 4 3
      clientv3/txn.go
  8. 4 4
      clientv3/watch.go

+ 2 - 2
clientv3/client.go

@@ -235,9 +235,9 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
 	return nil, err
 }
 
-// isHalted returns true if the given error and context indicate no forward
+// isHaltErr returns true if the given error and context indicate no forward
 // progress can be made, even after reconnecting.
-func isHalted(ctx context.Context, err error) bool {
+func isHaltErr(ctx context.Context, err error) bool {
 	isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ")
 	return isRPCError || ctx.Err() != nil
 }

+ 4 - 4
clientv3/client_test.go

@@ -55,16 +55,16 @@ func TestDialTimeout(t *testing.T) {
 	}
 }
 
-func TestIsHalted(t *testing.T) {
-	if !isHalted(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
+func TestIsHaltErr(t *testing.T) {
+	if !isHaltErr(nil, fmt.Errorf("etcdserver: some etcdserver error")) {
 		t.Errorf(`error prefixed with "etcdserver: " should be Halted`)
 	}
 	ctx, cancel := context.WithCancel(context.TODO())
-	if isHalted(ctx, nil) {
+	if isHaltErr(ctx, nil) {
 		t.Errorf("no error and active context should not be Halted")
 	}
 	cancel()
-	if !isHalted(ctx, nil) {
+	if !isHaltErr(ctx, nil) {
 		t.Errorf("cancel on context should be Halted")
 	}
 }

+ 13 - 12
clientv3/cluster.go

@@ -17,6 +17,7 @@ package clientv3
 import (
 	"sync"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -70,12 +71,12 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
 		return (*MemberAddResponse)(resp), nil
 	}
 
-	if isHalted(ctx, err) {
-		return nil, err
+	if isHaltErr(ctx, err) {
+		return nil, rpctypes.Error(err)
 	}
 
 	go c.switchRemote(err)
-	return nil, err
+	return nil, rpctypes.Error(err)
 }
 
 func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
@@ -85,12 +86,12 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
 		return (*MemberRemoveResponse)(resp), nil
 	}
 
-	if isHalted(ctx, err) {
-		return nil, err
+	if isHaltErr(ctx, err) {
+		return nil, rpctypes.Error(err)
 	}
 
 	go c.switchRemote(err)
-	return nil, err
+	return nil, rpctypes.Error(err)
 }
 
 func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
@@ -102,13 +103,13 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
 			return (*MemberUpdateResponse)(resp), nil
 		}
 
-		if isHalted(ctx, err) {
-			return nil, err
+		if isHaltErr(ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 
 		err = c.switchRemote(err)
 		if err != nil {
-			return nil, err
+			return nil, rpctypes.Error(err)
 		}
 	}
 }
@@ -121,13 +122,13 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
 			return (*MemberListResponse)(resp), nil
 		}
 
-		if isHalted(ctx, err) {
-			return nil, err
+		if isHaltErr(ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 
 		err = c.switchRemote(err)
 		if err != nil {
-			return nil, err
+			return nil, rpctypes.Error(err)
 		}
 	}
 }

+ 11 - 10
clientv3/kv.go

@@ -17,6 +17,7 @@ package clientv3
 import (
 	"sync"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -96,17 +97,17 @@ func NewKV(c *Client) KV {
 
 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
 	r, err := kv.Do(ctx, OpPut(key, val, opts...))
-	return r.put, err
+	return r.put, rpctypes.Error(err)
 }
 
 func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
 	r, err := kv.Do(ctx, OpGet(key, opts...))
-	return r.get, err
+	return r.get, rpctypes.Error(err)
 }
 
 func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
 	r, err := kv.Do(ctx, OpDelete(key, opts...))
-	return r.del, err
+	return r.del, rpctypes.Error(err)
 }
 
 func (kv *kv) Compact(ctx context.Context, rev int64) error {
@@ -116,12 +117,12 @@ func (kv *kv) Compact(ctx context.Context, rev int64) error {
 		return nil
 	}
 
-	if isHalted(ctx, err) {
-		return err
+	if isHaltErr(ctx, err) {
+		return rpctypes.Error(err)
 	}
 
 	go kv.switchRemote(err)
-	return err
+	return rpctypes.Error(err)
 }
 
 func (kv *kv) Txn(ctx context.Context) Txn {
@@ -166,14 +167,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 			panic("Unknown op")
 		}
 
-		if isHalted(ctx, err) {
-			return OpResponse{}, err
+		if isHaltErr(ctx, err) {
+			return OpResponse{}, rpctypes.Error(err)
 		}
 
 		// do not retry on modifications
 		if op.isWrite() {
 			go kv.switchRemote(err)
-			return OpResponse{}, err
+			return OpResponse{}, rpctypes.Error(err)
 		}
 
 		if nerr := kv.switchRemote(err); nerr != nil {
@@ -192,7 +193,7 @@ func (kv *kv) switchRemote(prevErr error) error {
 
 	newConn, err := kv.c.retryConnection(kv.conn, prevErr)
 	if err != nil {
-		return err
+		return rpctypes.Error(err)
 	}
 
 	kv.conn = newConn

+ 13 - 12
clientv3/lease.go

@@ -134,9 +134,10 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
 			}
 			return gresp, nil
 		}
-		if isHalted(cctx, err) {
-			return nil, err
+		if isHaltErr(cctx, err) {
+			return nil, rpctypes.Error(err)
 		}
+
 		if nerr := l.switchRemoteAndStream(err); nerr != nil {
 			return nil, nerr
 		}
@@ -155,8 +156,8 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
 		if err == nil {
 			return (*LeaseRevokeResponse)(resp), nil
 		}
-		if isHalted(ctx, err) {
-			return nil, err
+		if isHaltErr(ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 
 		if nerr := l.switchRemoteAndStream(err); nerr != nil {
@@ -204,8 +205,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 			}
 			return resp, err
 		}
-		if isHalted(ctx, err) {
-			return resp, err
+		if isHaltErr(ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 
 		nerr := l.switchRemoteAndStream(err)
@@ -259,17 +260,17 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 
 	stream, err := l.getRemote().LeaseKeepAlive(cctx)
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 
 	err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 
 	resp, rerr := stream.Recv()
 	if rerr != nil {
-		return nil, rerr
+		return nil, rpctypes.Error(rerr)
 	}
 
 	karesp := &LeaseKeepAliveResponse{
@@ -296,7 +297,7 @@ func (l *lessor) recvKeepAliveLoop() {
 	for serr == nil {
 		resp, err := stream.Recv()
 		if err != nil {
-			if isHalted(l.stopCtx, err) {
+			if isHaltErr(l.stopCtx, err) {
 				return
 			}
 			stream, serr = l.resetRecv()
@@ -411,7 +412,7 @@ func (l *lessor) switchRemoteAndStream(prevErr error) error {
 			conn.Close()
 			newConn, err = l.c.retryConnection(conn, prevErr)
 			if err != nil {
-				return err
+				return rpctypes.Error(err)
 			}
 		}
 
@@ -436,7 +437,7 @@ func (l *lessor) newStream() error {
 	stream, err := l.getRemote().LeaseKeepAlive(sctx)
 	if err != nil {
 		cancel()
-		return err
+		return rpctypes.Error(err)
 	}
 
 	l.mu.Lock()

+ 13 - 12
clientv3/maintenance.go

@@ -18,6 +18,7 @@ import (
 	"io"
 	"sync"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -81,8 +82,8 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
 		if err == nil {
 			return (*AlarmResponse)(resp), nil
 		}
-		if isHalted(ctx, err) {
-			return nil, err
+		if isHaltErr(ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 		if err = m.switchRemote(err); err != nil {
 			return nil, err
@@ -100,13 +101,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 	if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
 		ar, err := m.AlarmList(ctx)
 		if err != nil {
-			return nil, err
+			return nil, rpctypes.Error(err)
 		}
 		ret := AlarmResponse{}
 		for _, am := range ar.Alarms {
 			dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
 			if derr != nil {
-				return nil, derr
+				return nil, rpctypes.Error(derr)
 			}
 			ret.Alarms = append(ret.Alarms, dresp.Alarms...)
 		}
@@ -117,21 +118,21 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 	if err == nil {
 		return (*AlarmResponse)(resp), nil
 	}
-	if !isHalted(ctx, err) {
+	if isHaltErr(ctx, err) {
 		go m.switchRemote(err)
 	}
-	return nil, err
+	return nil, rpctypes.Error(err)
 }
 
 func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
 	conn, err := m.c.Dial(endpoint)
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 	remote := pb.NewMaintenanceClient(conn)
 	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 	return (*DefragmentResponse)(resp), nil
 }
@@ -139,12 +140,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
 func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
 	conn, err := m.c.Dial(endpoint)
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 	remote := pb.NewMaintenanceClient(conn)
 	resp, err := remote.Status(ctx, &pb.StatusRequest{})
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 	return (*StatusResponse)(resp), nil
 }
@@ -152,7 +153,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
 func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
 	ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{})
 	if err != nil {
-		return nil, err
+		return nil, rpctypes.Error(err)
 	}
 
 	pr, pw := io.Pipe()
@@ -187,7 +188,7 @@ func (m *maintenance) switchRemote(prevErr error) error {
 	defer m.mu.Unlock()
 	newConn, err := m.c.retryConnection(m.conn, prevErr)
 	if err != nil {
-		return err
+		return rpctypes.Error(err)
 	}
 	m.conn = newConn
 	m.remote = pb.NewMaintenanceClient(m.conn)

+ 4 - 3
clientv3/txn.go

@@ -17,6 +17,7 @@ package clientv3
 import (
 	"sync"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 )
@@ -146,13 +147,13 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 			return (*TxnResponse)(resp), nil
 		}
 
-		if isHalted(txn.ctx, err) {
-			return nil, err
+		if isHaltErr(txn.ctx, err) {
+			return nil, rpctypes.Error(err)
 		}
 
 		if txn.isWrite {
 			go kv.switchRemote(err)
-			return nil, err
+			return nil, rpctypes.Error(err)
 		}
 
 		if nerr := kv.switchRemote(err); nerr != nil {

+ 4 - 4
clientv3/watch.go

@@ -209,7 +209,7 @@ func (w *watcher) Close() error {
 	case <-w.donec:
 	}
 	<-w.donec
-	return <-w.errc
+	return v3rpc.Error(<-w.errc)
 }
 
 func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
@@ -496,7 +496,7 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
 			break
 		}
 	}
-	return ws, err
+	return ws, v3rpc.Error(err)
 }
 
 // openWatchClient retries opening a watchclient until retryConnection fails
@@ -504,8 +504,8 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 	for {
 		if ws, err = w.remote.Watch(w.ctx); ws != nil {
 			break
-		} else if isHalted(w.ctx, err) {
-			return nil, err
+		} else if isHaltErr(w.ctx, err) {
+			return nil, v3rpc.Error(err)
 		}
 		newConn, nerr := w.c.retryConnection(w.conn, nil)
 		if nerr != nil {