Ver código fonte

clientv3: close keep alive channel if no response within TTL

Anthony Romano 9 anos atrás
pai
commit
e534532523
1 arquivos alterados com 50 adições e 11 exclusões
  1. 50 11
      clientv3/lease.go

+ 50 - 11
clientv3/lease.go

@@ -44,6 +44,9 @@ type LeaseKeepAliveResponse struct {
 }
 
 const (
+	// defaultTTL is the assumed lease TTL used for the first keepalive
+	// deadline before the actual TTL is known to the client.
+	defaultTTL = 5 * time.Second
 	// a small buffer to store unsent lease responses.
 	leaseResponseChSize = 16
 	// NoLease is a lease ID for the absence of a lease.
@@ -84,26 +87,38 @@ type lessor struct {
 	stopCancel context.CancelFunc
 
 	keepAlives map[LeaseID]*keepAlive
+
+	// firstKeepAliveTimeout is the timeout for the first keepalive request
+	// before the actual TTL is known to the lease client
+	firstKeepAliveTimeout time.Duration
 }
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
 type keepAlive struct {
 	chs  []chan<- *LeaseKeepAliveResponse
 	ctxs []context.Context
-	// deadline is the next time to send a keep alive message
+	// deadline is the time the keep alive channels close if no response
 	deadline time.Time
+	// nextKeepAlive is when to send the next keep alive message
+	nextKeepAlive time.Time
 	// donec is closed on lease revoke, expiration, or cancel.
 	donec chan struct{}
 }
 
 func NewLease(c *Client) Lease {
 	l := &lessor{
-		donec:      make(chan struct{}),
-		keepAlives: make(map[LeaseID]*keepAlive),
-		remote:     pb.NewLeaseClient(c.conn),
+		donec:                 make(chan struct{}),
+		keepAlives:            make(map[LeaseID]*keepAlive),
+		remote:                pb.NewLeaseClient(c.conn),
+		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
+	}
+	if l.firstKeepAliveTimeout == time.Second {
+		l.firstKeepAliveTimeout = defaultTTL
 	}
+
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 	go l.recvKeepAliveLoop()
+	go l.deadlineLoop()
 	return l
 }
 
@@ -162,10 +177,11 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
 	if !ok {
 		// create fresh keep alive
 		ka = &keepAlive{
-			chs:      []chan<- *LeaseKeepAliveResponse{ch},
-			ctxs:     []context.Context{ctx},
-			deadline: time.Now(),
-			donec:    make(chan struct{}),
+			chs:           []chan<- *LeaseKeepAliveResponse{ch},
+			ctxs:          []context.Context{ctx},
+			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
+			nextKeepAlive: time.Now(),
+			donec:         make(chan struct{}),
 		}
 		l.keepAlives[id] = ka
 	} else {
@@ -327,16 +343,39 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 	}
 
 	// send update to all channels
-	nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
+	nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
+	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
 	for _, ch := range ka.chs {
 		select {
 		case ch <- karesp:
-			ka.deadline = nextDeadline
+			ka.nextKeepAlive = nextKeepAlive
 		default:
 		}
 	}
 }
 
+// deadlineLoop reaps any keep alive channels that have not recieved a resposne within
+// the lease TTL
+func (l *lessor) deadlineLoop() {
+	for {
+		select {
+		case <-time.After(time.Second):
+		case <-l.donec:
+			return
+		}
+		now := time.Now()
+		l.mu.Lock()
+		for id, ka := range l.keepAlives {
+			if ka.deadline.Before(now) {
+				// waited too long for response; lease may be expired
+				ka.Close()
+				delete(l.keepAlives, id)
+			}
+		}
+		l.mu.Unlock()
+	}
+}
+
 // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
 func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	for {
@@ -355,7 +394,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 		now := time.Now()
 		l.mu.Lock()
 		for id, ka := range l.keepAlives {
-			if ka.deadline.Before(now) {
+			if ka.nextKeepAlive.Before(now) {
 				tosend = append(tosend, id)
 			}
 		}