Browse Source

clientv3: make balancer respect FastFail

The simpleBalancer.Get() blocks grpc.Invoke() even when the Invoke() is called
with the FailFast option. Therefore currently any requests with the
FastFail option actually doesn't fail fast. They get blocked when there is
no endpoints available.
Get() method needs to respect the BlockingWait option when
picks up an endpoint address from the list and fail immediately when the option is
enabled and no endpoint is available.
Iwasaki Yudai 9 năm trước cách đây
mục cha
commit
6a33f0ffd5
6 tập tin đã thay đổi với 144 bổ sung18 xóa
  1. 10 10
      clientv3/auth.go
  2. 20 0
      clientv3/balancer.go
  3. 106 0
      clientv3/balancer_test.go
  4. 3 3
      clientv3/cluster.go
  5. 2 2
      clientv3/kv.go
  6. 3 3
      clientv3/lease.go

+ 10 - 10
clientv3/auth.go

@@ -116,32 +116,32 @@ func NewAuth(c *Client) Auth {
 }
 
 func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
-	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
+	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
 	return (*AuthEnableResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
-	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
+	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
 	return (*AuthDisableResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
-	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
+	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false))
 	return (*AuthUserAddResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
-	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
+	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false))
 	return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
-	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
+	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false))
 	return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
-	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
+	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false))
 	return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
 }
 
@@ -156,12 +156,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
 }
 
 func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
-	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
+	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false))
 	return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
-	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
+	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false))
 	return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
 }
 
@@ -186,12 +186,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
 }
 
 func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
-	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
+	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false))
 	return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
-	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
+	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false))
 	return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
 }
 

+ 20 - 0
clientv3/balancer.go

@@ -15,6 +15,7 @@
 package clientv3
 
 import (
+	"errors"
 	"net/url"
 	"strings"
 	"sync"
@@ -23,6 +24,11 @@ import (
 	"google.golang.org/grpc"
 )
 
+// ErrNoAddrAvilable is returned by Get() when the balancer does not have
+// any active connection to endpoints at the time.
+// This error is returned only when opts.BlockingWait is true.
+var ErrNoAddrAvilable = errors.New("there is no address available")
+
 // simpleBalancer does the bare minimum to expose multiple eps
 // to the grpc reconnection code path
 type simpleBalancer struct {
@@ -162,6 +168,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 
 func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
 	var addr string
+
+	// If opts.BlockingWait is false (for fail-fast RPCs), it should return
+	// an address it has notified via Notify immediately instead of blocking.
+	if !opts.BlockingWait {
+		b.mu.RLock()
+		addr = b.pinAddr
+		upEps := len(b.upEps)
+		b.mu.RUnlock()
+		if upEps == 0 {
+			return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
+		}
+		return grpc.Address{Addr: addr}, func() {}, nil
+	}
+
 	for {
 		b.mu.RLock()
 		ch := b.upc

+ 106 - 0
clientv3/balancer_test.go

@@ -0,0 +1,106 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"errors"
+	"testing"
+	"time"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+var (
+	endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
+)
+
+func TestBalancerGetUnblocking(t *testing.T) {
+	sb := newSimpleBalancer(endpoints)
+	unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false}
+
+	_, _, err := sb.Get(context.Background(), unblockingOpts)
+	if err != ErrNoAddrAvilable {
+		t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err)
+	}
+
+	down1 := sb.Up(grpc.Address{Addr: endpoints[1]})
+	down2 := sb.Up(grpc.Address{Addr: endpoints[2]})
+	addrFirst, putFun, err := sb.Get(context.Background(), unblockingOpts)
+	if err != nil {
+		t.Errorf("Get() with up endpoints should sucess, got %v", err)
+	}
+	if addrFirst.Addr != endpoints[1] && addrFirst.Addr != endpoints[2] {
+		t.Errorf("Get() didn't return expected address, got %v", addrFirst)
+	}
+	if putFun == nil {
+		t.Errorf("Get() returned unexpected nil put function")
+	}
+	addrSecond, _, _ := sb.Get(context.Background(), unblockingOpts)
+	if addrSecond.Addr != addrSecond.Addr {
+		t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond)
+	}
+
+	down1(errors.New("error"))
+	down2(errors.New("error"))
+	_, _, err = sb.Get(context.Background(), unblockingOpts)
+	if err != ErrNoAddrAvilable {
+		t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err)
+	}
+}
+
+func TestBalancerGetBlocking(t *testing.T) {
+	sb := newSimpleBalancer(endpoints)
+	blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}
+
+	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
+	_, _, err := sb.Get(ctx, blockingOpts)
+	if err != context.DeadlineExceeded {
+		t.Errorf("Get() with no up endpoints should timeout, got %v", err)
+	}
+
+	downC := make(chan func(error), 1)
+
+	go func() {
+		// ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get()
+		time.Sleep(time.Millisecond * 100)
+		downC <- sb.Up(grpc.Address{Addr: endpoints[1]})
+	}()
+	addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts)
+	if err != nil {
+		t.Errorf("Get() with up endpoints should sucess, got %v", err)
+	}
+	if addrFirst.Addr != endpoints[1] {
+		t.Errorf("Get() didn't return expected address, got %v", addrFirst)
+	}
+	if putFun == nil {
+		t.Errorf("Get() returned unexpected nil put function")
+	}
+	down1 := <-downC
+
+	down2 := sb.Up(grpc.Address{Addr: endpoints[2]})
+	addrSecond, _, _ := sb.Get(context.Background(), blockingOpts)
+	if addrSecond.Addr != addrSecond.Addr {
+		t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond)
+	}
+
+	down1(errors.New("error"))
+	down2(errors.New("error"))
+	ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100)
+	_, _, err = sb.Get(ctx, blockingOpts)
+	if err != context.DeadlineExceeded {
+		t.Errorf("Get() with no up endpoints should timeout, got %v", err)
+	}
+}

+ 3 - 3
clientv3/cluster.go

@@ -52,7 +52,7 @@ func NewCluster(c *Client) Cluster {
 
 func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
 	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
-	resp, err := c.remote.MemberAdd(ctx, r)
+	resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false))
 	if err == nil {
 		return (*MemberAddResponse)(resp), nil
 	}
@@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
 
 func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
 	r := &pb.MemberRemoveRequest{ID: id}
-	resp, err := c.remote.MemberRemove(ctx, r)
+	resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false))
 	if err == nil {
 		return (*MemberRemoveResponse)(resp), nil
 	}
@@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
 	// it is safe to retry on update.
 	for {
 		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
-		resp, err := c.remote.MemberUpdate(ctx, r)
+		resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
 		if err == nil {
 			return (*MemberUpdateResponse)(resp), nil
 		}

+ 2 - 2
clientv3/kv.go

@@ -148,14 +148,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 	case tPut:
 		var resp *pb.PutResponse
 		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
-		resp, err = kv.remote.Put(ctx, r)
+		resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false))
 		if err == nil {
 			return OpResponse{put: (*PutResponse)(resp)}, nil
 		}
 	case tDeleteRange:
 		var resp *pb.DeleteRangeResponse
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
-		resp, err = kv.remote.DeleteRange(ctx, r)
+		resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false))
 		if err == nil {
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 		}

+ 3 - 3
clientv3/lease.go

@@ -148,7 +148,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
 
 	for {
 		r := &pb.LeaseGrantRequest{TTL: ttl}
-		resp, err := l.remote.LeaseGrant(cctx, r)
+		resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false))
 		if err == nil {
 			gresp := &LeaseGrantResponse{
 				ResponseHeader: resp.GetHeader(),
@@ -174,7 +174,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
 
 	for {
 		r := &pb.LeaseRevokeRequest{ID: int64(id)}
-		resp, err := l.remote.LeaseRevoke(cctx, r)
+		resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false))
 
 		if err == nil {
 			return (*LeaseRevokeResponse)(resp), nil
@@ -195,7 +195,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
 
 	for {
 		r := toLeaseTimeToLiveRequest(id, opts...)
-		resp, err := l.remote.LeaseTimeToLive(cctx, r)
+		resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false))
 		if err == nil {
 			gresp := &LeaseTimeToLiveResponse{
 				ResponseHeader: resp.GetHeader(),