Browse Source

clientv3: separate readyWait for ConnectNotify

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
2540859ee7
2 changed files with 32 additions and 14 deletions
  1. 30 0
      clientv3/ready_wait.go
  2. 2 14
      clientv3/retry.go

+ 30 - 0
clientv3/ready_wait.go

@@ -0,0 +1,30 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import "context"
+
+// TODO: remove this when "FailFast=false" is fixed.
+// See https://github.com/grpc/grpc-go/issues/1532.
+func readyWait(rpcCtx, clientCtx context.Context, ready <-chan struct{}) error {
+	select {
+	case <-ready:
+		return nil
+	case <-rpcCtx.Done():
+		return rpcCtx.Err()
+	case <-clientCtx.Done():
+		return clientCtx.Err()
+	}
+}

+ 2 - 14
clientv3/retry.go

@@ -51,12 +51,8 @@ func isWriteStopError(err error) bool {
 func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
 	return func(rpcCtx context.Context, f rpcFunc) error {
 		for {
-			select {
-			case <-c.balancer.ConnectNotify():
-			case <-rpcCtx.Done():
-				return rpcCtx.Err()
-			case <-c.ctx.Done():
-				return c.ctx.Err()
+			if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
+				return err
 			}
 			pinned := c.balancer.pinned()
 			err := f(rpcCtx)
@@ -68,20 +64,12 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
 			}
 			// mark this before endpoint switch is triggered
 			c.balancer.hostPortError(pinned, err)
-			notify := c.balancer.ConnectNotify()
 			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
 				c.balancer.next()
 			}
 			if isStop(err) {
 				return err
 			}
-			select {
-			case <-notify:
-			case <-rpcCtx.Done():
-				return rpcCtx.Err()
-			case <-c.ctx.Done():
-				return c.ctx.Err()
-			}
 		}
 	}
 }