Browse Source

Merge pull request #7890 from yudai/keep_ka_loop_running

clientv3: Do no stop keep alive loop by server side errors
Xiang Li 8 years ago
parent
commit
b8875515a4
2 changed files with 38 additions and 19 deletions
  1. 8 0
      clientv3/client.go
  2. 30 19
      clientv3/lease.go

+ 8 - 0
clientv3/client.go

@@ -503,3 +503,11 @@ func toErr(ctx context.Context, err error) error {
 	}
 	return err
 }
+
+func canceledByCaller(stopCtx context.Context, err error) bool {
+	if stopCtx.Err() == nil || err == nil {
+		return false
+	}
+
+	return err == context.Canceled || err == context.DeadlineExceeded
+}

+ 30 - 19
clientv3/lease.go

@@ -69,7 +69,7 @@ const (
 	// NoLease is a lease ID for the absence of a lease.
 	NoLease LeaseID = 0
 
-	// retryConnWait is how long to wait before retrying on a lost leader
+	// retryConnWait is how long to wait before retrying request due to an error
 	retryConnWait = 500 * time.Millisecond
 )
 
@@ -392,34 +392,45 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 		l.mu.Unlock()
 	}()
 
-	stream, serr := l.resetRecv()
-	for serr == nil {
-		resp, err := stream.Recv()
-		if err == nil {
-			l.recvKeepAlive(resp)
-			continue
-		}
-		err = toErr(l.stopCtx, err)
-		if err == rpctypes.ErrNoLeader {
-			l.closeRequireLeader()
-			select {
-			case <-time.After(retryConnWait):
-			case <-l.stopCtx.Done():
+	for {
+		stream, err := l.resetRecv()
+		if err != nil {
+			if canceledByCaller(l.stopCtx, err) {
 				return err
 			}
-		} else if isHaltErr(l.stopCtx, err) {
-			return err
+		} else {
+			for {
+				resp, err := stream.Recv()
+
+				if err != nil {
+					if canceledByCaller(l.stopCtx, err) {
+						return err
+					}
+
+					if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
+						l.closeRequireLeader()
+					}
+					break
+				}
+
+				l.recvKeepAlive(resp)
+			}
+		}
+
+		select {
+		case <-time.After(retryConnWait):
+			continue
+		case <-l.stopCtx.Done():
+			return l.stopCtx.Err()
 		}
-		stream, serr = l.resetRecv()
 	}
-	return serr
 }
 
 // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 	sctx, cancel := context.WithCancel(l.stopCtx)
 	stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
-	if err = toErr(sctx, err); err != nil {
+	if err != nil {
 		cancel()
 		return nil, err
 	}