Browse Source

clientv3: compose all clientv3 APIs into client struct

Anthony Romano 9 năm trước cách đây
mục cha
commit
5f62c05a6d

+ 17 - 18
clientv3/client.go

@@ -24,7 +24,6 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/transport"
 )
 
@@ -34,14 +33,10 @@ var (
 
 // Client provides and manages an etcd v3 client session.
 type Client struct {
-	// KV is the keyvalue API for the client's connection.
-	KV pb.KVClient
-	// Lease is the lease API for the client's connection.
-	Lease pb.LeaseClient
-	// Watch is the watch API for the client's connection.
-	Watch pb.WatchClient
-	// Cluster is the cluster API for the client's connection.
-	Cluster pb.ClusterClient
+	Cluster
+	KV
+	Lease
+	Watcher
 
 	conn   *grpc.ClientConn
 	cfg    Config
@@ -86,6 +81,8 @@ func NewFromURL(url string) (*Client, error) {
 
 // Close shuts down the client's etcd connections.
 func (c *Client) Close() error {
+	c.Watcher.Close()
+	c.Lease.Close()
 	return c.conn.Close()
 }
 
@@ -146,15 +143,17 @@ func newClient(cfg *Config) (*Client, error) {
 	if err != nil {
 		return nil, err
 	}
-	return &Client{
-		KV:      pb.NewKVClient(conn),
-		Lease:   pb.NewLeaseClient(conn),
-		Watch:   pb.NewWatchClient(conn),
-		Cluster: pb.NewClusterClient(conn),
-		conn:    conn,
-		cfg:     *cfg,
-		creds:   creds,
-	}, nil
+	client := &Client{
+		conn:  conn,
+		cfg:   *cfg,
+		creds: creds,
+	}
+	client.Cluster = NewCluster(client)
+	client.KV = NewKV(client)
+	client.Lease = NewLease(client)
+	client.Watcher = NewWatcher(client)
+
+	return client, nil
 }
 
 // ActiveConnection returns the current in-use connection

+ 21 - 0
integration/cluster.go

@@ -37,6 +37,7 @@ import (
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
@@ -733,3 +734,23 @@ func (c *ClusterV3) RandClient() *clientv3.Client {
 func (c *ClusterV3) Client(i int) *clientv3.Client {
 	return c.clients[i]
 }
+
+type grpcAPI struct {
+	// Cluster is the cluster API for the client's connection.
+	Cluster pb.ClusterClient
+	// KV is the keyvalue API for the client's connection.
+	KV pb.KVClient
+	// Lease is the lease API for the client's connection.
+	Lease pb.LeaseClient
+	// Watch is the watch API for the client's connection.
+	Watch pb.WatchClient
+}
+
+func toGRPC(c *clientv3.Client) grpcAPI {
+	return grpcAPI{
+		pb.NewClusterClient(c.ActiveConnection()),
+		pb.NewKVClient(c.ActiveConnection()),
+		pb.NewLeaseClient(c.ActiveConnection()),
+		pb.NewWatchClient(c.ActiveConnection()),
+	}
+}

+ 12 - 12
integration/v3_grpc_test.go

@@ -33,7 +33,7 @@ func TestV3PutOverwrite(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	key := []byte("foo")
 	reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
 
@@ -77,7 +77,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 
 	// unique keys
 	i := new(int)
@@ -161,7 +161,7 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
 	},
 	}
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	tests := []struct {
 		txnSuccess []*pb.RequestUnion
 
@@ -208,7 +208,7 @@ func TestV3PutMissingLease(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	key := []byte("foo")
 	preq := &pb.PutRequest{Key: key, Lease: 123456}
 	tests := []func(){
@@ -324,7 +324,7 @@ func TestV3DeleteRange(t *testing.T) {
 
 	for i, tt := range tests {
 		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
-		kvc := clus.RandClient().KV
+		kvc := toGRPC(clus.RandClient()).KV
 
 		ks := tt.keySet
 		for j := range ks {
@@ -375,7 +375,7 @@ func TestV3TxnInvaildRange(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
 
 	for i := 0; i < 3; i++ {
@@ -419,7 +419,7 @@ func TestV3TooLargeRequest(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 
 	// 2MB request value
 	largeV := make([]byte, 2*1024*1024)
@@ -437,7 +437,7 @@ func TestV3Hash(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
 
 	for i := 0; i < 3; i++ {
@@ -590,7 +590,7 @@ func TestV3RangeRequest(t *testing.T) {
 	for i, tt := range tests {
 		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 		for _, k := range tt.putKeys {
-			kvc := clus.RandClient().KV
+			kvc := toGRPC(clus.RandClient()).KV
 			req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
 			if _, err := kvc.Put(context.TODO(), req); err != nil {
 				t.Fatalf("#%d: couldn't put key (%v)", i, err)
@@ -598,7 +598,7 @@ func TestV3RangeRequest(t *testing.T) {
 		}
 
 		for j, req := range tt.reqs {
-			kvc := clus.RandClient().KV
+			kvc := toGRPC(clus.RandClient()).KV
 			resp, err := kvc.Range(context.TODO(), &req)
 			if err != nil {
 				t.Errorf("#%d.%d: Range error: %v", i, j, err)
@@ -668,7 +668,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
 	donec := make(chan error, 1)
 	go func() {
 		reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
-		_, perr := client.KV.Put(ctx, reqput)
+		_, perr := toGRPC(client).KV.Put(ctx, reqput)
 		donec <- perr
 	}()
 
@@ -717,7 +717,7 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) {
 	defer client.Close()
 
 	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
-	if _, err := client.KV.Put(context.TODO(), reqput); err != nil {
+	if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
 		t.Fatalf("unexpected error on put over tls (%v)", err)
 	}
 }

+ 20 - 20
integration/v3_lease_test.go

@@ -33,7 +33,7 @@ func TestV3LeasePrmote(t *testing.T) {
 	defer clus.Terminate(t)
 
 	// create lease
-	lresp, err := clus.RandClient().Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5})
+	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5})
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -78,7 +78,7 @@ func TestV3LeasePrmote(t *testing.T) {
 func TestV3LeaseRevoke(t *testing.T) {
 	defer testutil.AfterTest(t)
 	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
-		lc := clus.RandClient().Lease
+		lc := toGRPC(clus.RandClient()).Lease
 		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
 		return err
 	})
@@ -91,7 +91,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 	defer clus.Terminate(t)
 
 	// create fixed lease
-	lresp, err := clus.RandClient().Lease.LeaseCreate(
+	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
 	if err != nil {
@@ -102,7 +102,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 	}
 
 	// create duplicate fixed lease
-	lresp, err = clus.RandClient().Lease.LeaseCreate(
+	lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
 	if err != v3rpc.ErrLeaseExist {
@@ -110,7 +110,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 	}
 
 	// create fresh fixed lease
-	lresp, err = clus.RandClient().Lease.LeaseCreate(
+	lresp, err = toGRPC(clus.RandClient()).Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 2, TTL: 1})
 	if err != nil {
@@ -129,7 +129,7 @@ func TestV3LeaseExpire(t *testing.T) {
 
 		ctx, cancel := context.WithCancel(context.Background())
 		defer cancel()
-		wStream, err := clus.RandClient().Watch.Watch(ctx)
+		wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 		if err != nil {
 			return err
 		}
@@ -177,7 +177,7 @@ func TestV3LeaseExpire(t *testing.T) {
 func TestV3LeaseKeepAlive(t *testing.T) {
 	defer testutil.AfterTest(t)
 	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
-		lc := clus.RandClient().Lease
+		lc := toGRPC(clus.RandClient()).Lease
 		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
 		ctx, cancel := context.WithCancel(context.Background())
 		defer cancel()
@@ -215,7 +215,7 @@ func TestV3LeaseExists(t *testing.T) {
 	// create lease
 	ctx0, cancel0 := context.WithCancel(context.Background())
 	defer cancel0()
-	lresp, err := clus.RandClient().Lease.LeaseCreate(
+	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
 		ctx0,
 		&pb.LeaseCreateRequest{TTL: 30})
 	if err != nil {
@@ -241,34 +241,34 @@ func TestV3LeaseSwitch(t *testing.T) {
 	// create lease
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-	lresp1, err1 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
+	lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
 	if err1 != nil {
 		t.Fatal(err1)
 	}
-	lresp2, err2 := clus.RandClient().Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
+	lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseCreate(ctx, &pb.LeaseCreateRequest{TTL: 30})
 	if err2 != nil {
 		t.Fatal(err2)
 	}
 
 	// attach key on lease1 then switch it to lease2
 	put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
-	_, err := clus.RandClient().KV.Put(ctx, put1)
+	_, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1)
 	if err != nil {
 		t.Fatal(err)
 	}
 	put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
-	_, err = clus.RandClient().KV.Put(ctx, put2)
+	_, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2)
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	// revoke lease1 should not remove key
-	_, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
+	_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
 	if err != nil {
 		t.Fatal(err)
 	}
 	rreq := &pb.RangeRequest{Key: []byte("foo")}
-	rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
+	rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -277,11 +277,11 @@ func TestV3LeaseSwitch(t *testing.T) {
 	}
 
 	// revoke lease2 should remove key
-	_, err = clus.RandClient().Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
+	_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
 	if err != nil {
 		t.Fatal(err)
 	}
-	rresp, err = clus.RandClient().KV.Range(context.TODO(), rreq)
+	rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -293,7 +293,7 @@ func TestV3LeaseSwitch(t *testing.T) {
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
 	// create lease
-	lresp, err := clus.RandClient().Lease.LeaseCreate(
+	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{TTL: 1})
 	if err != nil {
@@ -304,7 +304,7 @@ func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
 	}
 	// attach to key
 	put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
-	if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil {
+	if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil {
 		return 0, err
 	}
 	return lresp.ID, nil
@@ -327,7 +327,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
 
 	// confirm no key
 	rreq := &pb.RangeRequest{Key: []byte("foo")}
-	rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
+	rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -337,7 +337,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
 }
 
 func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
-	l := clus.RandClient().Lease
+	l := toGRPC(clus.RandClient()).Lease
 
 	_, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5})
 	if err == nil {

+ 15 - 15
integration/v3_watch_test.go

@@ -180,7 +180,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 	for i, tt := range tests {
 		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 
-		wAPI := clus.RandClient().Watch
+		wAPI := toGRPC(clus.RandClient()).Watch
 		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 		defer cancel()
 		wStream, err := wAPI.Watch(ctx)
@@ -212,7 +212,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 		// asynchronously create keys
 		go func() {
 			for _, k := range tt.putKeys {
-				kvc := clus.RandClient().KV
+				kvc := toGRPC(clus.RandClient()).KV
 				req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
 				if _, err := kvc.Put(context.TODO(), req); err != nil {
 					t.Fatalf("#%d: couldn't put key (%v)", i, err)
@@ -273,7 +273,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, errW := clus.RandClient().Watch.Watch(ctx)
+	wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if errW != nil {
 		t.Fatalf("wAPI.Watch error: %v", errW)
 	}
@@ -308,7 +308,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
 		t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
 	}
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
 		t.Errorf("couldn't put key (%v)", err)
 	}
@@ -331,7 +331,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
@@ -341,7 +341,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
 	// first revision already allocated as empty revision
 	for i := 1; i < nrRevisions; i++ {
 		go func() {
-			kvc := clus.RandClient().KV
+			kvc := toGRPC(clus.RandClient()).KV
 			req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
 			if _, err := kvc.Put(context.TODO(), req); err != nil {
 				t.Fatalf("couldn't put key (%v)", err)
@@ -418,11 +418,11 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
 // one watcher to test if it receives expected events.
 func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, errW := clus.RandClient().Watch.Watch(ctx)
+	wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if errW != nil {
 		t.Fatalf("wAPI.Watch error: %v", errW)
 	}
@@ -523,7 +523,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
@@ -535,7 +535,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 		t.Fatalf("wStream.Send error: %v", err)
 	}
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 	txn := pb.TxnRequest{}
 	for i := 0; i < 3; i++ {
 		ru := &pb.RequestUnion{}
@@ -605,7 +605,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	kvc := clus.RandClient().KV
+	kvc := toGRPC(clus.RandClient()).KV
 
 	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
 		t.Fatalf("couldn't put key (%v)", err)
@@ -616,7 +616,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
@@ -692,8 +692,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
 // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
 func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
-	wAPI := clus.RandClient().Watch
-	kvc := clus.RandClient().KV
+	wAPI := toGRPC(clus.RandClient()).Watch
+	kvc := toGRPC(clus.RandClient()).KV
 
 	streams := make([]pb.Watch_WatchClient, 5)
 	for i := range streams {
@@ -792,7 +792,7 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) {
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
-	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}