Browse Source

Merge pull request #4574 from heyitsanthony/clientv3-lease-ctx

clientv3: support context cancellation on lease keep alives
Anthony Romano 9 năm trước cách đây
mục cha
commit
f8c998906c
1 tập tin đã thay đổi với 89 bổ sung31 xóa
  1. 89 31
      clientv3/lease.go

+ 89 - 31
clientv3/lease.go

@@ -71,8 +71,17 @@ type lessor struct {
 	stopCtx    context.Context
 	stopCancel context.CancelFunc
 
-	keepAlives map[lease.LeaseID]chan *LeaseKeepAliveResponse
-	deadlines  map[lease.LeaseID]time.Time
+	keepAlives map[lease.LeaseID]*keepAlive
+}
+
+// keepAlive multiplexes a keepalive for a lease over multiple channels
+type keepAlive struct {
+	chs  []chan<- *LeaseKeepAliveResponse
+	ctxs []context.Context
+	// deadline is the next time to send a keep alive message
+	deadline time.Time
+	// donec is closed on lease revoke, expiration, or cancel.
+	donec chan struct{}
 }
 
 func NewLease(c *Client) Lease {
@@ -81,8 +90,7 @@ func NewLease(c *Client) Lease {
 		conn: c.ActiveConnection(),
 
 		donec:      make(chan struct{}),
-		keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse),
-		deadlines:  make(map[lease.LeaseID]time.Time),
+		keepAlives: make(map[lease.LeaseID]*keepAlive),
 	}
 
 	l.remote = pb.NewLeaseClient(l.conn)
@@ -138,26 +146,29 @@ func (l *lessor) Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResp
 }
 
 func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
-	lc := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
-
-	// todo: add concellation based on the passed in ctx
+	ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
 
 	l.mu.Lock()
-	_, ok := l.keepAlives[id]
+	ka, ok := l.keepAlives[id]
 	if !ok {
-		l.keepAlives[id] = lc
-		l.deadlines[id] = time.Now()
-		l.mu.Unlock()
-		return lc, nil
+		// create fresh keep alive
+		ka = &keepAlive{
+			chs:      []chan<- *LeaseKeepAliveResponse{ch},
+			ctxs:     []context.Context{ctx},
+			deadline: time.Now(),
+			donec:    make(chan struct{}),
+		}
+		l.keepAlives[id] = ka
+	} else {
+		// add channel and context to existing keep alive
+		ka.ctxs = append(ka.ctxs, ctx)
+		ka.chs = append(ka.chs, ch)
 	}
 	l.mu.Unlock()
 
-	resp, err := l.KeepAliveOnce(ctx, id)
-	if err != nil {
-		return nil, err
-	}
-	lc <- resp
-	return lc, nil
+	go l.keepAliveCtxCloser(id, ctx, ka.donec)
+
+	return ch, nil
 }
 
 func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
@@ -184,6 +195,38 @@ func (l *lessor) Close() error {
 	return nil
 }
 
+func (l *lessor) keepAliveCtxCloser(id lease.LeaseID, ctx context.Context, donec <-chan struct{}) {
+	select {
+	case <-donec:
+		return
+	case <-l.donec:
+		return
+	case <-ctx.Done():
+	}
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	ka, ok := l.keepAlives[id]
+	if !ok {
+		return
+	}
+
+	// close channel and remove context if still associated with keep alive
+	for i, c := range ka.ctxs {
+		if c == ctx {
+			close(ka.chs[i])
+			ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
+			ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
+			break
+		}
+	}
+	// remove if no one more listeners
+	if len(ka.chs) == 0 {
+		delete(l.keepAlives, id)
+	}
+}
+
 func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
 	stream, err := l.getRemote().LeaseKeepAlive(ctx)
 	if err != nil {
@@ -205,10 +248,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
 func (l *lessor) recvKeepAliveLoop() {
 	defer func() {
 		l.stopCancel()
+		l.mu.Lock()
 		close(l.donec)
-		for _, ch := range l.keepAlives {
-			close(ch)
+		for _, ka := range l.keepAlives {
+			ka.Close()
 		}
+		l.keepAlives = make(map[lease.LeaseID]*keepAlive)
+		l.mu.Unlock()
 	}()
 
 	stream, serr := l.resetRecv()
@@ -239,26 +285,31 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 
 // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
 func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
+	id := lease.LeaseID(resp.ID)
+
 	l.mu.Lock()
 	defer l.mu.Unlock()
 
-	lch, ok := l.keepAlives[lease.LeaseID(resp.ID)]
+	ka, ok := l.keepAlives[id]
 	if !ok {
 		return
 	}
 
 	if resp.TTL <= 0 {
-		close(lch)
-		delete(l.deadlines, lease.LeaseID(resp.ID))
-		delete(l.keepAlives, lease.LeaseID(resp.ID))
+		// lease expired; close all keep alive channels
+		delete(l.keepAlives, id)
+		ka.Close()
 		return
 	}
 
-	select {
-	case lch <- (*LeaseKeepAliveResponse)(resp):
-		l.deadlines[lease.LeaseID(resp.ID)] =
-			time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
-	default:
+	// send update to all channels
+	nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
+	for _, ch := range ka.chs {
+		select {
+		case ch <- (*LeaseKeepAliveResponse)(resp):
+			ka.deadline = nextDeadline
+		default:
+		}
 	}
 }
 
@@ -277,8 +328,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 
 		now := time.Now()
 		l.mu.Lock()
-		for id, d := range l.deadlines {
-			if d.Before(now) {
+		for id, ka := range l.keepAlives {
+			if ka.deadline.Before(now) {
 				tosend = append(tosend, id)
 			}
 		}
@@ -359,6 +410,13 @@ func (l *lessor) newStream() error {
 	return nil
 }
 
+func (ka *keepAlive) Close() {
+	close(ka.donec)
+	for _, ch := range ka.chs {
+		close(ch)
+	}
+}
+
 // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
 // should be closed when the work is finished. When done fires, cancelWhenStop will release
 // its internal resource.