Browse Source

Merge pull request #4382 from xiang90/lease_keep_test

clientv3/integration: test lease keepalive
Xiang Li 10 years ago
parent
commit
20673e384a
2 changed files with 116 additions and 1 deletions
  1. 110 1
      clientv3/integration/lease_test.go
  2. 6 0
      clientv3/lease.go

+ 110 - 1
clientv3/integration/lease_test.go

@@ -16,6 +16,7 @@ package integration
 
 import (
 	"testing"
+	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/clientv3"
@@ -50,7 +51,7 @@ func TestLeaseCreate(t *testing.T) {
 func TestLeaseRevoke(t *testing.T) {
 	defer testutil.AfterTest(t)
 
-	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
 	lapi := clientv3.NewLease(clus.RandClient())
@@ -73,3 +74,111 @@ func TestLeaseRevoke(t *testing.T) {
 		t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound)
 	}
 }
+
+func TestLeaseKeepAliveOnce(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	lapi := clientv3.NewLease(clus.RandClient())
+	defer lapi.Close()
+
+	resp, err := lapi.Create(context.Background(), 10)
+	if err != nil {
+		t.Errorf("failed to create lease %v", err)
+	}
+
+	_, err = lapi.KeepAliveOnce(context.Background(), lease.LeaseID(resp.ID))
+	if err != nil {
+		t.Errorf("failed to keepalive lease", err)
+	}
+}
+
+func TestLeaseKeepAlive(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	lapi := clientv3.NewLease(clus.RandClient())
+
+	resp, err := lapi.Create(context.Background(), 10)
+	if err != nil {
+		t.Errorf("failed to create lease %v", err)
+	}
+
+	rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
+	if kerr != nil {
+		t.Errorf("failed to keepalive lease %v", kerr)
+	}
+
+	kresp, ok := <-rc
+	if !ok {
+		t.Errorf("chan is closed, want not closed")
+	}
+
+	if kresp.ID != resp.ID {
+		t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
+	}
+
+	lapi.Close()
+
+	_, ok = <-rc
+	if ok {
+		t.Errorf("chan is not closed, want lease Close() closes chan")
+	}
+}
+
+// TODO: add a client that can connect to all the members of cluster via unix sock.
+// TODO: test handle more complicated failures.
+func TestLeaseKeepAliveHandleFailure(t *testing.T) {
+	t.Skip("test it when we have a cluster client")
+
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	// TODO: change this line to get a cluster client
+	lapi := clientv3.NewLease(clus.RandClient())
+
+	resp, err := lapi.Create(context.Background(), 10)
+	if err != nil {
+		t.Errorf("failed to create lease %v", err)
+	}
+
+	rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
+	if kerr != nil {
+		t.Errorf("failed to keepalive lease %v", kerr)
+	}
+
+	kresp := <-rc
+	if kresp.ID != resp.ID {
+		t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
+	}
+
+	// restart the connected member.
+	clus.Members[0].Stop(t)
+
+	select {
+	case <-rc:
+		t.Fatalf("unexpected keepalive")
+	case <-time.After(10*time.Second/3 + 1):
+	}
+
+	// recover the member.
+	clus.Members[0].Restart(t)
+
+	kresp = <-rc
+	if kresp.ID != resp.ID {
+		t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
+	}
+
+	lapi.Close()
+
+	_, ok := <-rc
+	if ok {
+		t.Errorf("chan is not closed, want lease Close() closes chan")
+	}
+}

+ 6 - 0
clientv3/lease.go

@@ -206,12 +206,18 @@ func (l *lessor) recvKeepAliveLoop() {
 	defer func() {
 		l.stopCancel()
 		close(l.donec)
+		for _, ch := range l.keepAlives {
+			close(ch)
+		}
 	}()
 
 	stream, serr := l.resetRecv()
 	for {
 		resp, err := stream.Recv()
 		if err != nil {
+			if isRPCError(err) {
+				return
+			}
 			if stream, serr = l.resetRecv(); serr != nil {
 				return
 			}