Browse Source

Merge pull request #7148 from heyitsanthony/fix-lease-overlap

clientv3: don't reset stream on keepaliveonce or revoke failure
Anthony Romano 9 years ago
parent
commit
7dfe503f1c
1 changed files with 15 additions and 35 deletions
  1. 15 35
      clientv3/lease.go

+ 15 - 35
clientv3/lease.go

@@ -195,9 +195,6 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
 		if isHaltErr(ctx, err) {
 			return nil, toErr(ctx, err)
 		}
-		if nerr := l.newStream(); nerr != nil {
-			return nil, nerr
-		}
 	}
 }
 
@@ -277,10 +274,6 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 		if isHaltErr(ctx, err) {
 			return nil, toErr(ctx, err)
 		}
-
-		if nerr := l.newStream(); nerr != nil {
-			return nil, nerr
-		}
 	}
 }
 
@@ -378,10 +371,23 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 
 // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
-	if err := l.newStream(); err != nil {
+	sctx, cancel := context.WithCancel(l.stopCtx)
+	stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
+	if err = toErr(sctx, err); err != nil {
+		cancel()
 		return nil, err
 	}
-	stream := l.getKeepAliveStream()
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	if l.stream != nil && l.streamCancel != nil {
+		l.stream.CloseSend()
+		l.streamCancel()
+	}
+
+	l.streamCancel = cancel
+	l.stream = stream
+
 	go l.sendKeepAliveLoop(stream)
 	return stream, nil
 }
@@ -477,32 +483,6 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	}
 }
 
-func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-	return l.stream
-}
-
-func (l *lessor) newStream() error {
-	sctx, cancel := context.WithCancel(l.stopCtx)
-	stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
-	if err != nil {
-		cancel()
-		return toErr(sctx, err)
-	}
-
-	l.mu.Lock()
-	defer l.mu.Unlock()
-	if l.stream != nil && l.streamCancel != nil {
-		l.stream.CloseSend()
-		l.streamCancel()
-	}
-
-	l.streamCancel = cancel
-	l.stream = stream
-	return nil
-}
-
 func (ka *keepAlive) Close() {
 	close(ka.donec)
 	for _, ch := range ka.chs {