Browse Source

Merge pull request #5603 from heyitsanthony/clientv3-close-keepalive

clientv3: close keepalive channel if TTL locally exceeded
Anthony Romano 9 years ago
parent
commit
78c957df41
2 changed files with 164 additions and 11 deletions
  1. 114 0
      clientv3/integration/lease_test.go
  2. 50 11
      clientv3/lease.go

+ 114 - 0
clientv3/integration/lease_test.go

@@ -339,3 +339,117 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
 	case <-donec:
 	}
 }
+
+// TestLeaseKeepAliveCloseAfterDisconnectExpire ensures the keep alive channel is closed
+// following a disconnection, lease revoke, then reconnect.
+func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+
+	// setup lease and do a keepalive
+	resp, err := cli.Grant(context.Background(), 10)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
+	if kerr != nil {
+		t.Fatal(kerr)
+	}
+	if kresp := <-rc; kresp.ID != resp.ID {
+		t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
+	}
+
+	// keep client disconnected
+	clus.Members[0].Stop(t)
+	time.Sleep(time.Second)
+	clus.WaitLeader(t)
+
+	if _, err := clus.Client(1).Revoke(context.TODO(), resp.ID); err != nil {
+		t.Fatal(err)
+	}
+
+	clus.Members[0].Restart(t)
+
+	select {
+	case ka, ok := <-rc:
+		if ok {
+			t.Fatalf("unexpected keepalive %v", ka)
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatalf("keepalive channel did not close")
+	}
+}
+
+// TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
+// the initial keep alive request never gets a response.
+func TestLeaseKeepAliveInitTimeout(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+
+	// setup lease and do a keepalive
+	resp, err := cli.Grant(context.Background(), 5)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
+	if kerr != nil {
+		t.Fatal(kerr)
+	}
+	// keep client disconnected
+	clus.Members[0].Stop(t)
+	select {
+	case ka, ok := <-rc:
+		if ok {
+			t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatalf("keepalive channel did not close")
+	}
+
+	clus.Members[0].Restart(t)
+}
+
+// TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
+// a keep alive request after the first never gets a response.
+func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+
+	// setup lease and do a keepalive
+	resp, err := cli.Grant(context.Background(), 5)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
+	if kerr != nil {
+		t.Fatal(kerr)
+	}
+	if kresp := <-rc; kresp.ID != resp.ID {
+		t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
+	}
+
+	// keep client disconnected
+	clus.Members[0].Stop(t)
+	select {
+	case ka, ok := <-rc:
+		if ok {
+			t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatalf("keepalive channel did not close")
+	}
+
+	clus.Members[0].Restart(t)
+}

+ 50 - 11
clientv3/lease.go

@@ -44,6 +44,9 @@ type LeaseKeepAliveResponse struct {
 }
 
 const (
+	// defaultTTL is the assumed lease TTL used for the first keepalive
+	// deadline before the actual TTL is known to the client.
+	defaultTTL = 5 * time.Second
 	// a small buffer to store unsent lease responses.
 	leaseResponseChSize = 16
 	// NoLease is a lease ID for the absence of a lease.
@@ -84,26 +87,38 @@ type lessor struct {
 	stopCancel context.CancelFunc
 
 	keepAlives map[LeaseID]*keepAlive
+
+	// firstKeepAliveTimeout is the timeout for the first keepalive request
+	// before the actual TTL is known to the lease client
+	firstKeepAliveTimeout time.Duration
 }
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
 type keepAlive struct {
 	chs  []chan<- *LeaseKeepAliveResponse
 	ctxs []context.Context
-	// deadline is the next time to send a keep alive message
+	// deadline is the time the keep alive channels close if no response
 	deadline time.Time
+	// nextKeepAlive is when to send the next keep alive message
+	nextKeepAlive time.Time
 	// donec is closed on lease revoke, expiration, or cancel.
 	donec chan struct{}
 }
 
 func NewLease(c *Client) Lease {
 	l := &lessor{
-		donec:      make(chan struct{}),
-		keepAlives: make(map[LeaseID]*keepAlive),
-		remote:     pb.NewLeaseClient(c.conn),
+		donec:                 make(chan struct{}),
+		keepAlives:            make(map[LeaseID]*keepAlive),
+		remote:                pb.NewLeaseClient(c.conn),
+		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
+	}
+	if l.firstKeepAliveTimeout == time.Second {
+		l.firstKeepAliveTimeout = defaultTTL
 	}
+
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 	go l.recvKeepAliveLoop()
+	go l.deadlineLoop()
 	return l
 }
 
@@ -162,10 +177,11 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
 	if !ok {
 		// create fresh keep alive
 		ka = &keepAlive{
-			chs:      []chan<- *LeaseKeepAliveResponse{ch},
-			ctxs:     []context.Context{ctx},
-			deadline: time.Now(),
-			donec:    make(chan struct{}),
+			chs:           []chan<- *LeaseKeepAliveResponse{ch},
+			ctxs:          []context.Context{ctx},
+			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
+			nextKeepAlive: time.Now(),
+			donec:         make(chan struct{}),
 		}
 		l.keepAlives[id] = ka
 	} else {
@@ -327,16 +343,39 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 	}
 
 	// send update to all channels
-	nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
+	nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
+	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
 	for _, ch := range ka.chs {
 		select {
 		case ch <- karesp:
-			ka.deadline = nextDeadline
+			ka.nextKeepAlive = nextKeepAlive
 		default:
 		}
 	}
 }
 
+// deadlineLoop reaps any keep alive channels that have not recieved a resposne within
+// the lease TTL
+func (l *lessor) deadlineLoop() {
+	for {
+		select {
+		case <-time.After(time.Second):
+		case <-l.donec:
+			return
+		}
+		now := time.Now()
+		l.mu.Lock()
+		for id, ka := range l.keepAlives {
+			if ka.deadline.Before(now) {
+				// waited too long for response; lease may be expired
+				ka.Close()
+				delete(l.keepAlives, id)
+			}
+		}
+		l.mu.Unlock()
+	}
+}
+
 // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
 func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	for {
@@ -355,7 +394,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 		now := time.Now()
 		l.mu.Lock()
 		for id, ka := range l.keepAlives {
-			if ka.deadline.Before(now) {
+			if ka.nextKeepAlive.Before(now) {
 				tosend = append(tosend, id)
 			}
 		}