瀏覽代碼

Merge pull request #7285 from fanminshi/uses_direct_client_call_for_tests

clientv3: integration test uses direct client calls
fanmin shi 8 年之前
父節點
當前提交
9b72c8ba1b

+ 4 - 5
clientv3/integration/cluster_test.go

@@ -18,7 +18,6 @@ import (
 	"reflect"
 	"testing"
 
-	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/integration"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
@@ -31,7 +30,7 @@ func TestMemberList(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	capi := clientv3.NewCluster(clus.RandClient())
+	capi := clus.RandClient()
 
 	resp, err := capi.MemberList(context.Background())
 	if err != nil {
@@ -49,7 +48,7 @@ func TestMemberAdd(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	capi := clientv3.NewCluster(clus.RandClient())
+	capi := clus.RandClient()
 
 	urls := []string{"http://127.0.0.1:1234"}
 	resp, err := capi.MemberAdd(context.Background(), urls)
@@ -68,7 +67,7 @@ func TestMemberRemove(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	capi := clientv3.NewCluster(clus.Client(1))
+	capi := clus.Client(1)
 	resp, err := capi.MemberList(context.Background())
 	if err != nil {
 		t.Fatalf("failed to list member %v", err)
@@ -106,7 +105,7 @@ func TestMemberUpdate(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	capi := clientv3.NewCluster(clus.RandClient())
+	capi := clus.RandClient()
 	resp, err := capi.MemberList(context.Background())
 	if err != nil {
 		t.Fatalf("failed to list member %v", err)

+ 18 - 24
clientv3/integration/kv_test.go

@@ -42,7 +42,7 @@ func TestKVPutError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	_, err := kv.Put(ctx, "", "bar")
@@ -74,10 +74,9 @@ func TestKVPut(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	resp, err := lapi.Grant(context.Background(), 10)
@@ -120,7 +119,7 @@ func TestKVPutWithIgnoreValue(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 
 	_, err := kv.Put(context.TODO(), "foo", "", clientv3.WithIgnoreValue())
 	if err != rpctypes.ErrKeyNotFound {
@@ -153,10 +152,9 @@ func TestKVPutWithIgnoreLease(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -203,7 +201,7 @@ func TestKVPutWithRequireLeader(t *testing.T) {
 	)
 	time.Sleep(time.Duration(3*electionTicks) * tickDuration)
 
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 	_, err := kv.Put(clientv3.WithRequireLeader(context.Background()), "foo", "bar")
 	if err != rpctypes.ErrNoLeader {
 		t.Fatal(err)
@@ -223,7 +221,7 @@ func TestKVRange(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	keySet := []string{"a", "b", "c", "c", "c", "foo", "foo/abc", "fop"}
@@ -408,12 +406,11 @@ func TestKVGetErrConnClosed(t *testing.T) {
 	defer clus.Terminate(t)
 
 	cli := clus.Client(0)
-	kv := clientv3.NewKV(cli)
 
 	donec := make(chan struct{})
 	go func() {
 		defer close(donec)
-		_, err := kv.Get(context.TODO(), "foo")
+		_, err := cli.Get(context.TODO(), "foo")
 		if err != nil && err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
@@ -445,8 +442,7 @@ func TestKVNewAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		kv := clientv3.NewKV(cli)
-		if _, err := kv.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing {
+		if _, err := cli.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
 		close(donec)
@@ -464,7 +460,7 @@ func TestKVDeleteRange(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	tests := []struct {
@@ -536,7 +532,7 @@ func TestKVDelete(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	presp, err := kv.Put(ctx, "foo", "")
@@ -568,7 +564,7 @@ func TestKVCompactError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	for i := 0; i < 5; i++ {
@@ -598,7 +594,7 @@ func TestKVCompact(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	for i := 0; i < 10; i++ {
@@ -620,9 +616,7 @@ func TestKVCompact(t *testing.T) {
 	// new watcher could precede receiving the compaction without quorum first
 	wcli.Get(ctx, "quorum-get")
 
-	wc := clientv3.NewWatcher(wcli)
-	defer wc.Close()
-	wchan := wc.Watch(ctx, "foo", clientv3.WithRev(3))
+	wchan := wcli.Watch(ctx, "foo", clientv3.WithRev(3))
 
 	if wr := <-wchan; wr.CompactRevision != 7 {
 		t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision)
@@ -649,7 +643,7 @@ func TestKVGetRetry(t *testing.T) {
 	// could give no other endpoints for client reconnection
 	fIdx := (clus.WaitLeader(t) + 1) % clusterSize
 
-	kv := clientv3.NewKV(clus.Client(fIdx))
+	kv := clus.Client(fIdx)
 	ctx := context.TODO()
 
 	if _, err := kv.Put(ctx, "foo", "bar"); err != nil {
@@ -697,7 +691,7 @@ func TestKVPutFailGetRetry(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 	clus.Members[0].Stop(t)
 
 	ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
@@ -738,7 +732,7 @@ func TestKVGetCancel(t *testing.T) {
 	defer clus.Terminate(t)
 
 	oldconn := clus.Client(0).ActiveConnection()
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 
 	ctx, cancel := context.WithCancel(context.TODO())
 	cancel()

+ 18 - 26
clientv3/integration/lease_test.go

@@ -36,10 +36,7 @@ func TestLeaseNotFoundError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
-
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 
 	_, err := kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(500)))
 	if err != rpctypes.ErrLeaseNotFound {
@@ -53,10 +50,9 @@ func TestLeaseGrant(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -75,10 +71,9 @@ func TestLeaseRevoke(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -102,8 +97,7 @@ func TestLeaseKeepAliveOnce(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -127,7 +121,8 @@ func TestLeaseKeepAlive(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
+	lapi := clus.Client(0)
+	clus.TakeClient(0)
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -167,7 +162,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
 	defer clus.Terminate(t)
 
 	// TODO: change this line to get a cluster client
-	lapi := clientv3.NewLease(clus.RandClient())
+	lapi := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
@@ -261,12 +256,12 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
 	defer clus.Terminate(t)
 
 	cli := clus.Client(0)
-	le := clientv3.NewLease(cli)
+	clus.TakeClient(0)
 
 	donec := make(chan struct{})
 	go func() {
 		defer close(donec)
-		_, err := le.Grant(context.TODO(), 5)
+		_, err := cli.Grant(context.TODO(), 5)
 		if err != nil && err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
@@ -275,7 +270,6 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
 	if err := cli.Close(); err != nil {
 		t.Fatal(err)
 	}
-	clus.TakeClient(0)
 
 	select {
 	case <-time.After(3 * time.Second):
@@ -298,8 +292,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		le := clientv3.NewLease(cli)
-		if _, err := le.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing {
+		if _, err := cli.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
 		close(donec)
@@ -318,8 +311,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
 	defer clus.Terminate(t)
 
 	cli := clus.Client(0)
-	le := clientv3.NewLease(cli)
-	resp, err := le.Grant(context.TODO(), 5)
+	resp, err := cli.Grant(context.TODO(), 5)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -332,7 +324,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		if _, err := le.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing {
+		if _, err := cli.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
 		close(donec)
@@ -466,15 +458,14 @@ func TestLeaseTimeToLive(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lapi := clientv3.NewLease(clus.RandClient())
-	defer lapi.Close()
+	lapi := clus.RandClient()
 
 	resp, err := lapi.Grant(context.Background(), 10)
 	if err != nil {
 		t.Errorf("failed to create lease %v", err)
 	}
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	keys := []string{"foo1", "foo2"}
 	for i := range keys {
 		if _, err = kv.Put(context.TODO(), keys[i], "bar", clientv3.WithLease(resp.ID)); err != nil {
@@ -564,12 +555,13 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
 
 	ctx := context.Background()
 	cli := clus.Client(0)
+	clus.TakeClient(0)
 
 	resp, err := cli.Grant(ctx, 5)
 	if err != nil {
 		t.Fatal(err)
 	}
-	cli.Lease.Close()
+	cli.Close()
 
 	_, err = cli.KeepAlive(ctx, resp.ID)
 	if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {

+ 1 - 2
clientv3/integration/role_test.go

@@ -17,7 +17,6 @@ package integration
 import (
 	"testing"
 
-	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/integration"
 	"github.com/coreos/etcd/pkg/testutil"
@@ -30,7 +29,7 @@ func TestRoleError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	authapi := clientv3.NewAuth(clus.RandClient())
+	authapi := clus.RandClient()
 
 	_, err := authapi.RoleAdd(context.TODO(), "test-role")
 	if err != nil {

+ 4 - 4
clientv3/integration/txn_test.go

@@ -33,7 +33,7 @@ func TestTxnError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	ctx := context.TODO()
 
 	_, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar1"), clientv3.OpPut("foo", "bar2")).Commit()
@@ -57,7 +57,7 @@ func TestTxnWriteFail(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 
 	clus.Members[0].Stop(t)
 
@@ -105,7 +105,7 @@ func TestTxnReadRetry(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 	clus.Members[0].Stop(t)
 	<-clus.Members[0].StopNotify()
 
@@ -136,7 +136,7 @@ func TestTxnSuccess(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kv := clientv3.NewKV(clus.Client(0))
+	kv := clus.Client(0)
 	ctx := context.TODO()
 
 	_, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar")).Commit()

+ 1 - 2
clientv3/integration/user_test.go

@@ -17,7 +17,6 @@ package integration
 import (
 	"testing"
 
-	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/integration"
 	"github.com/coreos/etcd/pkg/testutil"
@@ -30,7 +29,7 @@ func TestUserError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	authapi := clientv3.NewAuth(clus.RandClient())
+	authapi := clus.RandClient()
 
 	_, err := authapi.UserAdd(context.TODO(), "foo", "bar")
 	if err != nil {

+ 12 - 22
clientv3/integration/watch_test.go

@@ -37,7 +37,6 @@ type watcherTest func(*testing.T, *watchctx)
 type watchctx struct {
 	clus          *integration.ClusterV3
 	w             clientv3.Watcher
-	wclient       *clientv3.Client
 	kv            clientv3.KV
 	wclientMember int
 	kvMember      int
@@ -51,19 +50,16 @@ func runWatchTest(t *testing.T, f watcherTest) {
 	defer clus.Terminate(t)
 
 	wclientMember := rand.Intn(3)
-	wclient := clus.Client(wclientMember)
-	w := clientv3.NewWatcher(wclient)
-	defer w.Close()
+	w := clus.Client(wclientMember).Watcher
 	// select a different client from wclient so puts succeed if
 	// a test knocks out the watcher client
 	kvMember := rand.Intn(3)
 	for kvMember == wclientMember {
 		kvMember = rand.Intn(3)
 	}
-	kvclient := clus.Client(kvMember)
-	kv := clientv3.NewKV(kvclient)
+	kv := clus.Client(kvMember).KV
 
-	wctx := &watchctx{clus, w, wclient, kv, wclientMember, kvMember, nil}
+	wctx := &watchctx{clus, w, kv, wclientMember, kvMember, nil}
 	f(t, wctx)
 }
 
@@ -359,8 +355,7 @@ func TestWatchResumeCompacted(t *testing.T) {
 	defer clus.Terminate(t)
 
 	// create a waiting watcher at rev 1
-	w := clientv3.NewWatcher(clus.Client(0))
-	defer w.Close()
+	w := clus.Client(0)
 	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1))
 	select {
 	case w := <-wch:
@@ -381,7 +376,7 @@ func TestWatchResumeCompacted(t *testing.T) {
 
 	// put some data and compact away
 	numPuts := 5
-	kv := clientv3.NewKV(clus.Client(1))
+	kv := clus.Client(1)
 	for i := 0; i < numPuts; i++ {
 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 			t.Fatal(err)
@@ -447,15 +442,14 @@ func TestWatchCompactRevision(t *testing.T) {
 	defer clus.Terminate(t)
 
 	// set some keys
-	kv := clientv3.NewKV(clus.RandClient())
+	kv := clus.RandClient()
 	for i := 0; i < 5; i++ {
 		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
 			t.Fatal(err)
 		}
 	}
 
-	w := clientv3.NewWatcher(clus.RandClient())
-	defer w.Close()
+	w := clus.RandClient()
 
 	if _, err := kv.Compact(context.TODO(), 4); err != nil {
 		t.Fatal(err)
@@ -493,8 +487,7 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	wc := clientv3.NewWatcher(clus.RandClient())
-	defer wc.Close()
+	wc := clus.RandClient()
 
 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 	if watchOnPut {
@@ -511,7 +504,7 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 		t.Fatalf("watch response expected in %v, but timed out", pi)
 	}
 
-	kvc := clientv3.NewKV(clus.RandClient())
+	kvc := clus.RandClient()
 	if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
 		t.Fatal(err)
 	}
@@ -614,13 +607,11 @@ func TestWatchErrConnClosed(t *testing.T) {
 	defer clus.Terminate(t)
 
 	cli := clus.Client(0)
-	defer cli.Close()
-	wc := clientv3.NewWatcher(cli)
 
 	donec := make(chan struct{})
 	go func() {
 		defer close(donec)
-		ch := wc.Watch(context.TODO(), "foo")
+		ch := cli.Watch(context.TODO(), "foo")
 		if wr := <-ch; grpc.ErrorDesc(wr.Err()) != grpc.ErrClientConnClosing.Error() {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, grpc.ErrorDesc(wr.Err()))
 		}
@@ -652,9 +643,8 @@ func TestWatchAfterClose(t *testing.T) {
 
 	donec := make(chan struct{})
 	go func() {
-		wc := clientv3.NewWatcher(cli)
-		wc.Watch(context.TODO(), "foo")
-		if err := wc.Close(); err != nil && err != grpc.ErrClientConnClosing {
+		cli.Watch(context.TODO(), "foo")
+		if err := cli.Close(); err != nil && err != grpc.ErrClientConnClosing {
 			t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
 		}
 		close(donec)