Browse Source

clientv3: synchronous lease Close

Anthony Romano 10 years ago
parent
commit
eb8ab3ace4
1 changed files with 51 additions and 72 deletions
  1. 51 72
      clientv3/lease.go

+ 51 - 72
clientv3/lease.go

@@ -57,9 +57,11 @@ type Lease interface {
 type lessor struct {
 	c *Client
 
-	mu      sync.Mutex       // guards all fields
-	conn    *grpc.ClientConn // conn in-use
-	initedc chan bool
+	mu   sync.Mutex       // guards all fields
+	conn *grpc.ClientConn // conn in-use
+
+	// donec is closed when recvKeepAliveLoop stops
+	donec chan struct{}
 
 	remote pb.LeaseClient
 
@@ -78,8 +80,7 @@ func NewLease(c *Client) Lease {
 		c:    c,
 		conn: c.ActiveConnection(),
 
-		initedc: make(chan bool, 1),
-
+		donec:      make(chan struct{}),
 		keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse),
 		deadlines:  make(map[lease.LeaseID]time.Time),
 	}
@@ -87,10 +88,7 @@ func NewLease(c *Client) Lease {
 	l.remote = pb.NewLeaseClient(l.conn)
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 
-	l.initedc <- false
-
 	go l.recvKeepAliveLoop()
-	go l.sendKeepAliveLoop()
 
 	return l
 }
@@ -181,11 +179,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
 }
 
 func (l *lessor) Close() error {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
 	l.stopCancel()
-	l.stream = nil
+	<-l.donec
 	return nil
 }
 
@@ -208,56 +203,66 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
 }
 
 func (l *lessor) recvKeepAliveLoop() {
-	if !l.initStream() {
-		l.Close()
-		return
-	}
+	defer func() {
+		l.stopCancel()
+		close(l.donec)
+	}()
 
+	stream, serr := l.resetRecv()
 	for {
-		stream := l.getKeepAliveStream()
-
 		resp, err := stream.Recv()
 		if err != nil {
-			err = l.switchRemoteAndStream(err)
-			if err != nil {
-				l.Close()
+			if stream, serr = l.resetRecv(); serr != nil {
 				return
 			}
 			continue
 		}
+		l.recvKeepAlive(resp)
+	}
+}
 
-		l.mu.Lock()
-		lch, ok := l.keepAlives[lease.LeaseID(resp.ID)]
-		if !ok {
-			l.mu.Unlock()
-			continue
-		}
-
-		if resp.TTL <= 0 {
-			close(lch)
-			delete(l.deadlines, lease.LeaseID(resp.ID))
-			delete(l.keepAlives, lease.LeaseID(resp.ID))
-		} else {
-			select {
-			case lch <- (*LeaseKeepAliveResponse)(resp):
-				l.deadlines[lease.LeaseID(resp.ID)] =
-					time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
-			default:
-			}
-		}
-		l.mu.Unlock()
+// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
+func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
+	if err := l.switchRemoteAndStream(nil); err != nil {
+		return nil, err
 	}
+	stream := l.getKeepAliveStream()
+	go l.sendKeepAliveLoop(stream)
+	return stream, nil
 }
 
-func (l *lessor) sendKeepAliveLoop() {
-	if !l.initStream() {
-		l.Close()
+// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
+func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	lch, ok := l.keepAlives[lease.LeaseID(resp.ID)]
+	if !ok {
+		return
+	}
+
+	if resp.TTL <= 0 {
+		close(lch)
+		delete(l.deadlines, lease.LeaseID(resp.ID))
+		delete(l.keepAlives, lease.LeaseID(resp.ID))
 		return
 	}
 
+	select {
+	case lch <- (*LeaseKeepAliveResponse)(resp):
+		l.deadlines[lease.LeaseID(resp.ID)] =
+			time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
+	default:
+	}
+}
+
+// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
+func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	for {
 		select {
 		case <-time.After(500 * time.Millisecond):
+		case <-l.donec:
+			return
 		case <-l.stopCtx.Done():
 			return
 		}
@@ -273,21 +278,10 @@ func (l *lessor) sendKeepAliveLoop() {
 		}
 		l.mu.Unlock()
 
-		stream := l.getKeepAliveStream()
-
-		var err error
 		for _, id := range tosend {
 			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
-			err = stream.Send(r)
-			if err != nil {
-				break
-			}
-		}
-
-		if err != nil {
-			err = l.switchRemoteAndStream(err)
-			if err != nil {
-				l.Close()
+			if err := stream.Send(r); err != nil {
+				// TODO do something with this error?
 				return
 			}
 		}
@@ -359,21 +353,6 @@ func (l *lessor) newStream() error {
 	return nil
 }
 
-func (l *lessor) initStream() bool {
-	ok := <-l.initedc
-	if ok {
-		return true
-	}
-
-	err := l.switchRemoteAndStream(nil)
-	if err == nil {
-		l.initedc <- true
-		return true
-	}
-	l.initedc <- false
-	return false
-}
-
 // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
 // should be closed when the work is finished. When done fires, cancelWhenStop will release
 // its internal resource.