Browse Source

contrib/recipes, integration: use clientv3

updating both together since there's a circular dependency
Anthony Romano 10 năm trước cách đây
mục cha
commit
56fce9f386

+ 3 - 2
contrib/recipes/barrier.go

@@ -16,6 +16,7 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
@@ -23,11 +24,11 @@ import (
 // Barrier creates a key in etcd to block processes, then deletes the key to
 // release all blocked processes.
 type Barrier struct {
-	client *EtcdClient
+	client *clientv3.Client
 	key    string
 }
 
-func NewBarrier(client *EtcdClient, key string) *Barrier {
+func NewBarrier(client *clientv3.Client, key string) *Barrier {
 	return &Barrier{client, key}
 }
 

+ 4 - 19
contrib/recipes/client.go

@@ -18,7 +18,6 @@ import (
 	"errors"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	spb "github.com/coreos/etcd/storage/storagepb"
 )
@@ -28,22 +27,8 @@ var (
 	ErrWaitMismatch = errors.New("unexpected wait result")
 )
 
-type EtcdClient struct {
-	conn  *grpc.ClientConn
-	KV    pb.KVClient
-	Lease pb.LeaseClient
-	Watch pb.WatchClient
-}
-
-func NewEtcdClient(conn *grpc.ClientConn) *EtcdClient {
-	kv := pb.NewKVClient(conn)
-	lease := pb.NewLeaseClient(conn)
-	watch := pb.NewWatchClient(conn)
-	return &EtcdClient{conn, kv, lease, watch}
-}
-
 // deleteRevKey deletes a key by revision, returning false if key is missing
-func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
+func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
 	cmp := &pb.Compare{
 		Result:      pb.Compare_EQUAL,
 		Target:      pb.Compare_MOD,
@@ -52,7 +37,7 @@ func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
 	}
 	req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
 		RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
-	txnresp, err := ec.KV.Txn(
+	txnresp, err := kvc.Txn(
 		context.TODO(),
 		&pb.TxnRequest{
 			Compare: []*pb.Compare{cmp},
@@ -67,9 +52,9 @@ func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
 	return true, nil
 }
 
-func (ec *EtcdClient) claimFirstKey(kvs []*spb.KeyValue) (*spb.KeyValue, error) {
+func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
 	for _, kv := range kvs {
-		ok, err := ec.deleteRevKey(string(kv.Key), kv.ModRevision)
+		ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision)
 		if err != nil {
 			return nil, err
 		} else if ok {

+ 10 - 9
contrib/recipes/key.go

@@ -20,23 +20,24 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 )
 
 // Key is a key/revision pair created by the client and stored on etcd
 type RemoteKV struct {
-	client *EtcdClient
+	client *clientv3.Client
 	key    string
 	rev    int64
 	val    string
 }
 
-func NewKey(client *EtcdClient, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
+func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
 	return NewKV(client, key, "", leaseID)
 }
 
-func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
 	rev, err := putNewKV(client, key, val, leaseID)
 	if err != nil {
 		return nil, err
@@ -44,7 +45,7 @@ func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteK
 	return &RemoteKV{client, key, rev, val}, nil
 }
 
-func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) {
+func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
 	resp, err := client.KV.Range(
 		context.TODO(),
 		&pb.RangeRequest{Key: []byte(key)},
@@ -65,11 +66,11 @@ func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) {
 		val:    val}, nil
 }
 
-func NewUniqueKey(client *EtcdClient, prefix string) (*RemoteKV, error) {
+func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) {
 	return NewUniqueKV(client, prefix, "", 0)
 }
 
-func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
 	for {
 		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
 		rev, err := putNewKV(client, newKey, val, 0)
@@ -84,7 +85,7 @@ func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.Le
 
 // putNewKV attempts to create the given key, only succeeding if the key did
 // not yet exist.
-func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, error) {
+func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) {
 	cmp := &pb.Compare{
 		Result:      pb.Compare_EQUAL,
 		Target:      pb.Compare_VERSION,
@@ -110,13 +111,13 @@ func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, er
 }
 
 // NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
-func NewSequentialKV(client *EtcdClient, prefix, val string) (*RemoteKV, error) {
+func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) {
 	return newSequentialKV(client, prefix, val, 0)
 }
 
 // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
 // value and lease.  Note: a bookkeeping node __<prefix> is also allocated.
-func newSequentialKV(client *EtcdClient, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
 	resp, err := NewRange(client, prefix).LastKey()
 	if err != nil {
 		return nil, err

+ 4 - 3
contrib/recipes/mutex.go

@@ -17,17 +17,18 @@ package recipe
 import (
 	"sync"
 
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // Mutex implements the sync Locker interface with etcd
 type Mutex struct {
-	client *EtcdClient
+	client *clientv3.Client
 	key    string
 	myKey  *RemoteKV
 }
 
-func NewMutex(client *EtcdClient, key string) *Mutex {
+func NewMutex(client *clientv3.Client, key string) *Mutex {
 	return &Mutex{client, key, nil}
 }
 
@@ -80,6 +81,6 @@ func (lm *lockerMutex) Unlock() {
 	}
 }
 
-func NewLocker(client *EtcdClient, key string) sync.Locker {
+func NewLocker(client *clientv3.Client, key string) sync.Locker {
 	return &lockerMutex{NewMutex(client, key)}
 }

+ 5 - 4
contrib/recipes/priority_queue.go

@@ -17,17 +17,18 @@ package recipe
 import (
 	"fmt"
 
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // PriorityQueue implements a multi-reader, multi-writer distributed queue.
 type PriorityQueue struct {
-	client *EtcdClient
+	client *clientv3.Client
 	key    string
 }
 
 // NewPriorityQueue creates an etcd priority queue.
-func NewPriorityQueue(client *EtcdClient, key string) *PriorityQueue {
+func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue {
 	return &PriorityQueue{client, key + "/"}
 }
 
@@ -47,7 +48,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	kv, err := q.client.claimFirstKey(resp.Kvs)
+	kv, err := claimFirstKey(q.client.KV, resp.Kvs)
 	if err != nil {
 		return "", err
 	} else if kv != nil {
@@ -67,7 +68,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
+	ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
 	if err != nil {
 		return "", err
 	} else if !ok {

+ 5 - 4
contrib/recipes/queue.go

@@ -15,16 +15,17 @@
 package recipe
 
 import (
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // Queue implements a multi-reader, multi-writer distributed queue.
 type Queue struct {
-	client    *EtcdClient
+	client    *clientv3.Client
 	keyPrefix string
 }
 
-func NewQueue(client *EtcdClient, keyPrefix string) *Queue {
+func NewQueue(client *clientv3.Client, keyPrefix string) *Queue {
 	return &Queue{client, keyPrefix}
 }
 
@@ -42,7 +43,7 @@ func (q *Queue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	kv, err := q.client.claimFirstKey(resp.Kvs)
+	kv, err := claimFirstKey(q.client.KV, resp.Kvs)
 	if err != nil {
 		return "", err
 	} else if kv != nil {
@@ -62,7 +63,7 @@ func (q *Queue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
+	ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
 	if err != nil {
 		return "", err
 	} else if !ok {

+ 3 - 2
contrib/recipes/range.go

@@ -16,6 +16,7 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
@@ -26,11 +27,11 @@ type Range struct {
 	keyEnd []byte
 }
 
-func NewRange(client *EtcdClient, key string) *Range {
+func NewRange(client *clientv3.Client, key string) *Range {
 	return NewRangeRev(client, key, 0)
 }
 
-func NewRangeRev(client *EtcdClient, key string, rev int64) *Range {
+func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range {
 	return &Range{client.KV, []byte(key), rev, prefixEnd(key)}
 }
 

+ 3 - 2
contrib/recipes/rwmutex.go

@@ -15,16 +15,17 @@
 package recipe
 
 import (
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 type RWMutex struct {
-	client *EtcdClient
+	client *clientv3.Client
 	key    string
 	myKey  *RemoteKV
 }
 
-func NewRWMutex(client *EtcdClient, key string) *RWMutex {
+func NewRWMutex(client *clientv3.Client, key string) *RWMutex {
 	return &RWMutex{client, key, nil}
 }
 

+ 3 - 2
contrib/recipes/stm.go

@@ -16,12 +16,13 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 // STM implements software transactional memory over etcd
 type STM struct {
-	client *EtcdClient
+	client *clientv3.Client
 	// rset holds the read key's value and revision of read
 	rset map[string]*RemoteKV
 	// wset holds the write key and its value
@@ -32,7 +33,7 @@ type STM struct {
 }
 
 // NewSTM creates new transaction loop for a given apply function.
-func NewSTM(client *EtcdClient, apply func(*STM) error) <-chan error {
+func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
 	s := &STM{client: client, apply: apply}
 	errc := make(chan error, 1)
 	go func() {

+ 6 - 5
contrib/recipes/watch.go

@@ -16,6 +16,7 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/storage/storagepb"
@@ -30,15 +31,15 @@ type Watcher struct {
 	lastErr error
 }
 
-func NewWatcher(c *EtcdClient, key string, rev int64) (*Watcher, error) {
+func NewWatcher(c *clientv3.Client, key string, rev int64) (*Watcher, error) {
 	return newWatcher(c, key, rev, false)
 }
 
-func NewPrefixWatcher(c *EtcdClient, prefix string, rev int64) (*Watcher, error) {
+func NewPrefixWatcher(c *clientv3.Client, prefix string, rev int64) (*Watcher, error) {
 	return newWatcher(c, prefix, rev, true)
 }
 
-func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, error) {
+func newWatcher(c *clientv3.Client, key string, rev int64, isPrefix bool) (*Watcher, error) {
 	ctx, cancel := context.WithCancel(context.Background())
 	w, err := c.Watch.Watch(ctx)
 	if err != nil {
@@ -134,7 +135,7 @@ func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event,
 }
 
 // WaitEvents waits on a key until it observes the given events and returns the final one.
-func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
 	w, err := NewWatcher(c, key, rev)
 	if err != nil {
 		return nil, err
@@ -143,7 +144,7 @@ func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_Even
 	return w.waitEvents(evs)
 }
 
-func WaitPrefixEvents(c *EtcdClient, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
 	w, err := NewPrefixWatcher(c, prefix, rev)
 	if err != nil {
 		return nil, err

+ 8 - 3
integration/cluster_test.go

@@ -32,6 +32,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
@@ -729,8 +730,8 @@ func (m *member) listenGRPC() error {
 	return nil
 }
 
-// newGrpcClient creates a new grpc client connection to the member
-func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
+// NewClientV3 creates a new grpc client connection to the member
+func NewClientV3(m *member) (*clientv3.Client, error) {
 	if m.grpcAddr == "" {
 		return nil, fmt.Errorf("member not configured for grpc")
 	}
@@ -738,7 +739,11 @@ func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
 		return net.Dial("unix", a)
 	}
 	unixdialer := grpc.WithDialer(f)
-	return grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
+	conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
+	if err != nil {
+		return nil, err
+	}
+	return clientv3.NewFromConn(conn), nil
 }
 
 // Clone returns a member with the same server configuration. The returned

+ 8 - 8
integration/v3_barrier_test.go

@@ -17,27 +17,27 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/contrib/recipes"
 	"github.com/coreos/etcd/pkg/testutil"
 )
 
 func TestBarrierSingleNode(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testBarrier(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+	testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 
 func TestBarrierMultiNode(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testBarrier(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+	testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 
-func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
-	b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
+func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
+	b := recipe.NewBarrier(chooseClient(), "test-barrier")
 	if err := b.Hold(); err != nil {
 		t.Fatalf("could not hold barrier (%v)", err)
 	}
@@ -48,7 +48,7 @@ func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn)
 	donec := make(chan struct{})
 	for i := 0; i < waiters; i++ {
 		go func() {
-			br := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
+			br := recipe.NewBarrier(chooseClient(), "test-barrier")
 			if err := br.Wait(); err != nil {
 				t.Fatalf("could not wait on barrier (%v)", err)
 			}

+ 55 - 60
integration/v3_grpc_test.go

@@ -24,7 +24,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
@@ -34,47 +34,47 @@ import (
 
 type clusterV3 struct {
 	*cluster
-	conns []*grpc.ClientConn
+	clients []*clientv3.Client
 }
 
-// newClusterGRPC returns a launched cluster with a grpc client connection
+// newClusterV3 returns a launched cluster with a grpc client connection
 // for each cluster member.
-func newClusterGRPC(t *testing.T, cfg *clusterConfig) *clusterV3 {
+func newClusterV3(t *testing.T, cfg *clusterConfig) *clusterV3 {
 	cfg.useV3 = true
 	cfg.useGRPC = true
 	clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)}
 	for _, m := range clus.Members {
-		conn, err := NewGRPCClient(m)
+		client, err := NewClientV3(m)
 		if err != nil {
 			t.Fatal(err)
 		}
-		clus.conns = append(clus.conns, conn)
+		clus.clients = append(clus.clients, client)
 	}
 	clus.Launch(t)
 	return clus
 }
 
 func (c *clusterV3) Terminate(t *testing.T) {
-	for _, conn := range c.conns {
-		if err := conn.Close(); err != nil {
+	for _, client := range c.clients {
+		if err := client.Close(); err != nil {
 			t.Error(err)
 		}
 	}
 	c.cluster.Terminate(t)
 }
 
-func (c *clusterV3) RandConn() *grpc.ClientConn {
-	return c.conns[rand.Intn(len(c.conns))]
+func (c *clusterV3) RandClient() *clientv3.Client {
+	return c.clients[rand.Intn(len(c.clients))]
 }
 
 // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
 // overwrites it, then checks that the change was applied.
 func TestV3PutOverwrite(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 	key := []byte("foo")
 	reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
 
@@ -115,10 +115,10 @@ func TestV3PutOverwrite(t *testing.T) {
 
 func TestV3TxnTooManyOps(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 
 	addCompareOps := func(txn *pb.TxnRequest) {
 		txn.Compare = append(txn.Compare,
@@ -173,10 +173,10 @@ func TestV3TxnTooManyOps(t *testing.T) {
 // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
 func TestV3PutMissingLease(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 	key := []byte("foo")
 	preq := &pb.PutRequest{Key: key, Lease: 123456}
 	tests := []func(){
@@ -290,8 +290,8 @@ func TestV3DeleteRange(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		clus := newClusterGRPC(t, &clusterConfig{size: 3})
-		kvc := pb.NewKVClient(clus.RandConn())
+		clus := newClusterV3(t, &clusterConfig{size: 3})
+		kvc := clus.RandClient().KV
 
 		ks := tt.keySet
 		for j := range ks {
@@ -336,10 +336,10 @@ func TestV3DeleteRange(t *testing.T) {
 // TestV3TxnInvaildRange tests txn
 func TestV3TxnInvaildRange(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
 
 	for i := 0; i < 3; i++ {
@@ -553,9 +553,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		clus := newClusterGRPC(t, &clusterConfig{size: 3})
+		clus := newClusterV3(t, &clusterConfig{size: 3})
 
-		wAPI := pb.NewWatchClient(clus.RandConn())
+		wAPI := clus.RandClient().Watch
 		ctx, cancel := context.WithCancel(context.Background())
 		defer cancel()
 		wStream, err := wAPI.Watch(ctx)
@@ -569,7 +569,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 
 		go func() {
 			for _, k := range tt.putKeys {
-				kvc := pb.NewKVClient(clus.RandConn())
+				kvc := clus.RandClient().KV
 				req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
 				if _, err := kvc.Put(context.TODO(), req); err != nil {
 					t.Fatalf("#%d: couldn't put key (%v)", i, err)
@@ -629,12 +629,11 @@ func TestV3WatchCancelUnsynced(t *testing.T) {
 }
 
 func testV3WatchCancel(t *testing.T, startRev int64) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
-	wAPI := pb.NewWatchClient(clus.RandConn())
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-	wStream, errW := wAPI.Watch(ctx)
+	wStream, errW := clus.RandClient().Watch.Watch(ctx)
 	if errW != nil {
 		t.Fatalf("wAPI.Watch error: %v", errW)
 	}
@@ -669,7 +668,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
 		t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
 	}
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
 		t.Errorf("couldn't put key (%v)", err)
 	}
@@ -698,13 +697,12 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
 // that matches all watchers, and another key that matches only
 // one watcher to test if it receives expected events.
 func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
-	wAPI := pb.NewWatchClient(clus.RandConn())
-	kvc := pb.NewKVClient(clus.RandConn())
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	kvc := clus.RandClient().KV
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-	wStream, errW := wAPI.Watch(ctx)
+	wStream, errW := clus.RandClient().Watch.Watch(ctx)
 	if errW != nil {
 		t.Fatalf("wAPI.Watch error: %v", errW)
 	}
@@ -801,12 +799,11 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
 
 // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
 func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 
-	wAPI := pb.NewWatchClient(clus.RandConn())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-	wStream, wErr := wAPI.Watch(ctx)
+	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
@@ -818,7 +815,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 		t.Fatalf("wStream.Send error: %v", err)
 	}
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 	txn := pb.TxnRequest{}
 	for i := 0; i < 3; i++ {
 		ru := &pb.RequestUnion{}
@@ -885,10 +882,10 @@ func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.
 
 func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	kvc := pb.NewKVClient(clus.RandConn())
+	kvc := clus.RandClient().KV
 
 	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
 		t.Fatalf("couldn't put key (%v)", err)
@@ -897,10 +894,9 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 		t.Fatalf("couldn't put key (%v)", err)
 	}
 
-	wAPI := pb.NewWatchClient(clus.RandConn())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-	wStream, wErr := wAPI.Watch(ctx)
+	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
 	if wErr != nil {
 		t.Fatalf("wAPI.Watch error: %v", wErr)
 	}
@@ -975,9 +971,9 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
 
 // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
 func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
-	wAPI := pb.NewWatchClient(clus.RandConn())
-	kvc := pb.NewKVClient(clus.RandConn())
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	wAPI := clus.RandClient().Watch
+	kvc := clus.RandClient().KV
 
 	streams := make([]pb.Watch_WatchClient, 5)
 	for i := range streams {
@@ -1199,9 +1195,9 @@ func TestV3RangeRequest(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		clus := newClusterGRPC(t, &clusterConfig{size: 3})
+		clus := newClusterV3(t, &clusterConfig{size: 3})
 		for _, k := range tt.putKeys {
-			kvc := pb.NewKVClient(clus.RandConn())
+			kvc := clus.RandClient().KV
 			req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
 			if _, err := kvc.Put(context.TODO(), req); err != nil {
 				t.Fatalf("#%d: couldn't put key (%v)", i, err)
@@ -1209,7 +1205,7 @@ func TestV3RangeRequest(t *testing.T) {
 		}
 
 		for j, req := range tt.reqs {
-			kvc := pb.NewKVClient(clus.RandConn())
+			kvc := clus.RandClient().KV
 			resp, err := kvc.Range(context.TODO(), &req)
 			if err != nil {
 				t.Errorf("#%d.%d: Range error: %v", i, j, err)
@@ -1244,7 +1240,7 @@ func TestV3RangeRequest(t *testing.T) {
 func TestV3LeaseRevoke(t *testing.T) {
 	defer testutil.AfterTest(t)
 	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
-		lc := pb.NewLeaseClient(clus.RandConn())
+		lc := clus.RandClient().Lease
 		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
 		return err
 	})
@@ -1253,11 +1249,11 @@ func TestV3LeaseRevoke(t *testing.T) {
 // TestV3LeaseCreateById ensures leases may be created by a given id.
 func TestV3LeaseCreateByID(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
 	// create fixed lease
-	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+	lresp, err := clus.RandClient().Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
 	if err != nil {
@@ -1268,7 +1264,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 	}
 
 	// create duplicate fixed lease
-	lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+	lresp, err = clus.RandClient().Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
 	if err != nil {
@@ -1279,7 +1275,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 	}
 
 	// create fresh fixed lease
-	lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+	lresp, err = clus.RandClient().Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{ID: 2, TTL: 1})
 	if err != nil {
@@ -1297,10 +1293,9 @@ func TestV3LeaseExpire(t *testing.T) {
 	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
 		// let lease lapse; wait for deleted key
 
-		wAPI := pb.NewWatchClient(clus.RandConn())
 		ctx, cancel := context.WithCancel(context.Background())
 		defer cancel()
-		wStream, err := wAPI.Watch(ctx)
+		wStream, err := clus.RandClient().Watch.Watch(ctx)
 		if err != nil {
 			return err
 		}
@@ -1348,7 +1343,7 @@ func TestV3LeaseExpire(t *testing.T) {
 func TestV3LeaseKeepAlive(t *testing.T) {
 	defer testutil.AfterTest(t)
 	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
-		lc := pb.NewLeaseClient(clus.RandConn())
+		lc := clus.RandClient().Lease
 		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
 		ctx, cancel := context.WithCancel(context.Background())
 		defer cancel()
@@ -1381,13 +1376,13 @@ func TestV3LeaseKeepAlive(t *testing.T) {
 // client to confirm it's visible to the whole cluster.
 func TestV3LeaseExists(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
 	// create lease
 	ctx0, cancel0 := context.WithCancel(context.Background())
 	defer cancel0()
-	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+	lresp, err := clus.RandClient().Lease.LeaseCreate(
 		ctx0,
 		&pb.LeaseCreateRequest{TTL: 30})
 	if err != nil {
@@ -1400,7 +1395,7 @@ func TestV3LeaseExists(t *testing.T) {
 	// confirm keepalive
 	ctx1, cancel1 := context.WithCancel(context.Background())
 	defer cancel1()
-	lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(ctx1)
+	lac, err := clus.RandClient().Lease.LeaseKeepAlive(ctx1)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -1416,7 +1411,7 @@ func TestV3LeaseExists(t *testing.T) {
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 	// create lease
-	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+	lresp, err := clus.RandClient().Lease.LeaseCreate(
 		context.TODO(),
 		&pb.LeaseCreateRequest{TTL: 1})
 	if err != nil {
@@ -1427,7 +1422,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 	}
 	// attach to key
 	put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
-	if _, err := pb.NewKVClient(clus.RandConn()).Put(context.TODO(), put); err != nil {
+	if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil {
 		return 0, err
 	}
 	return lresp.ID, nil
@@ -1436,7 +1431,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 // testLeaseRemoveLeasedKey performs some action while holding a lease with an
 // attached key "foo", then confirms the key is gone.
 func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
 	leaseID, err := acquireLeaseAndKey(clus, "foo")
@@ -1450,7 +1445,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
 
 	// confirm no key
 	rreq := &pb.RangeRequest{Key: []byte("foo")}
-	rresp, err := pb.NewKVClient(clus.RandConn()).Range(context.TODO(), rreq)
+	rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 15 - 15
integration/v3_lock_test.go

@@ -18,28 +18,28 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/contrib/recipes"
 )
 
 func TestMutexSingleNode(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+	testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 
 func TestMutexMultiNode(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+	testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 
-func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
+func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
 	// stream lock acquistions
 	lockedC := make(chan *recipe.Mutex, 1)
 	for i := 0; i < waiters; i++ {
 		go func() {
-			m := recipe.NewMutex(recipe.NewEtcdClient(chooseConn()), "test-mutex")
+			m := recipe.NewMutex(chooseClient(), "test-mutex")
 			if err := m.Lock(); err != nil {
 				t.Fatalf("could not wait on lock (%v)", err)
 			}
@@ -68,32 +68,32 @@ func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
 
 func BenchmarkMutex4Waiters(b *testing.B) {
 	// XXX switch tests to use TB interface
-	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	clus := newClusterV3(nil, &clusterConfig{size: 3})
 	defer clus.Terminate(nil)
 	for i := 0; i < b.N; i++ {
-		testMutex(nil, 4, func() *grpc.ClientConn { return clus.RandConn() })
+		testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
 	}
 }
 
 func TestRWMutexSingleNode(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testRWMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+	testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 
 func TestRWMutexMultiNode(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
-	testRWMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+	testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 
-func testRWMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
+func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
 	// stream rwlock acquistions
 	rlockedC := make(chan *recipe.RWMutex, 1)
 	wlockedC := make(chan *recipe.RWMutex, 1)
 	for i := 0; i < waiters; i++ {
 		go func() {
-			rwm := recipe.NewRWMutex(recipe.NewEtcdClient(chooseConn()), "test-rwmutex")
+			rwm := recipe.NewRWMutex(chooseClient(), "test-rwmutex")
 			if rand.Intn(1) == 0 {
 				if err := rwm.RLock(); err != nil {
 					t.Fatalf("could not rlock (%v)", err)

+ 11 - 11
integration/v3_queue_test.go

@@ -29,7 +29,7 @@ const (
 
 // TestQueueOneReaderOneWriter confirms the queue is FIFO
 func TestQueueOneReaderOneWriter(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	clus := newClusterV3(t, &clusterConfig{size: 1})
 	defer clus.Terminate(t)
 
 	done := make(chan struct{})
@@ -37,7 +37,7 @@ func TestQueueOneReaderOneWriter(t *testing.T) {
 		defer func() {
 			done <- struct{}{}
 		}()
-		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		etcdc := clus.RandClient()
 		q := recipe.NewQueue(etcdc, "testq")
 		for i := 0; i < 5; i++ {
 			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
@@ -46,7 +46,7 @@ func TestQueueOneReaderOneWriter(t *testing.T) {
 		}
 	}()
 
-	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	etcdc := clus.RandClient()
 	q := recipe.NewQueue(etcdc, "testq")
 	for i := 0; i < 5; i++ {
 		s, err := q.Dequeue()
@@ -75,7 +75,7 @@ func TestQueueManyReaderManyWriter(t *testing.T) {
 // BenchmarkQueue benchmarks Queues using many/many readers/writers
 func BenchmarkQueue(b *testing.B) {
 	// XXX switch tests to use TB interface
-	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	clus := newClusterV3(nil, &clusterConfig{size: 3})
 	defer clus.Terminate(nil)
 	for i := 0; i < b.N; i++ {
 		testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
@@ -84,11 +84,11 @@ func BenchmarkQueue(b *testing.B) {
 
 // TestPrQueue tests whether priority queues respect priorities.
 func TestPrQueueOneReaderOneWriter(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	clus := newClusterV3(t, &clusterConfig{size: 1})
 	defer clus.Terminate(t)
 
 	// write out five items with random priority
-	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	etcdc := clus.RandClient()
 	q := recipe.NewPriorityQueue(etcdc, "testprq")
 	for i := 0; i < 5; i++ {
 		// [0, 2] priority for priority collision to test seq keys
@@ -116,7 +116,7 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
 }
 
 func TestPrQueueManyReaderManyWriter(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 	rqs := newPriorityQueues(clus, manyQueueClients)
 	wqs := newPriorityQueues(clus, manyQueueClients)
@@ -126,7 +126,7 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) {
 // BenchmarkQueue benchmarks Queues using n/n readers/writers
 func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
 	// XXX switch tests to use TB interface
-	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	clus := newClusterV3(nil, &clusterConfig{size: 3})
 	defer clus.Terminate(nil)
 	rqs := newPriorityQueues(clus, 1)
 	wqs := newPriorityQueues(clus, 1)
@@ -136,14 +136,14 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
 }
 
 func testQueueNReaderMWriter(t *testing.T, n int, m int) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 	testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
 }
 
 func newQueues(clus *clusterV3, n int) (qs []testQueue) {
 	for i := 0; i < n; i++ {
-		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		etcdc := clus.RandClient()
 		qs = append(qs, recipe.NewQueue(etcdc, "q"))
 	}
 	return qs
@@ -151,7 +151,7 @@ func newQueues(clus *clusterV3, n int) (qs []testQueue) {
 
 func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) {
 	for i := 0; i < n; i++ {
-		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		etcdc := clus.RandClient()
 		q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
 		qs = append(qs, q)
 	}

+ 7 - 7
integration/v3_stm_test.go

@@ -24,10 +24,10 @@ import (
 
 // TestSTMConflict tests that conflicts are retried.
 func TestSTMConflict(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	clus := newClusterV3(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
-	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	etcdc := clus.RandClient()
 	keys := make([]*recipe.RemoteKV, 5)
 	for i := 0; i < len(keys); i++ {
 		rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
@@ -39,7 +39,7 @@ func TestSTMConflict(t *testing.T) {
 
 	errc := make([]<-chan error, len(keys))
 	for i, rk := range keys {
-		curEtcdc := recipe.NewEtcdClient(clus.RandConn())
+		curEtcdc := clus.RandClient()
 		srcKey := rk.Key()
 		applyf := func(stm *recipe.STM) error {
 			src, err := stm.Get(srcKey)
@@ -89,10 +89,10 @@ func TestSTMConflict(t *testing.T) {
 
 // TestSTMPut confirms a STM put on a new key is visible after commit.
 func TestSTMPutNewKey(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	clus := newClusterV3(t, &clusterConfig{size: 1})
 	defer clus.Terminate(t)
 
-	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	etcdc := clus.RandClient()
 	applyf := func(stm *recipe.STM) error {
 		stm.Put("foo", "bar")
 		return nil
@@ -113,10 +113,10 @@ func TestSTMPutNewKey(t *testing.T) {
 
 // TestSTMAbort tests that an aborted txn does not modify any keys.
 func TestSTMAbort(t *testing.T) {
-	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	clus := newClusterV3(t, &clusterConfig{size: 1})
 	defer clus.Terminate(t)
 
-	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	etcdc := clus.RandClient()
 	applyf := func(stm *recipe.STM) error {
 		stm.Put("foo", "baz")
 		stm.Abort()