Browse Source

Merge pull request #4336 from gyuho/clientv3_test

integration: V3 grpc with clientv3 (only Put)
Gyu-Ho Lee 10 years ago
parent
commit
e7f50d8444

+ 63 - 0
clientv3/integration/client_test.go

@@ -0,0 +1,63 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestKVPut(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	tests := []struct {
+		key, val string
+		leaseID  lease.LeaseID
+	}{
+		{"foo", "bar", lease.NoLease},
+
+		// TODO: test with leaseID
+	}
+
+	for i, tt := range tests {
+		clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+		defer clus.Terminate(t)
+
+		kv := clientv3.NewKV(clus.RandClient())
+
+		if _, err := kv.Put(tt.key, tt.val, tt.leaseID); err != nil {
+			t.Fatalf("#%d: couldn't put (%v)", i, tt.key, err)
+		}
+
+		resp, err := kv.Get(tt.key, 0)
+		if err != nil {
+			t.Fatalf("#%d: couldn't get key (%v)", i, err)
+		}
+		if len(resp.Kvs) != 1 {
+			t.Fatalf("#%d: expected 1 key, got %d", i, len(resp.Kvs))
+		}
+		if !bytes.Equal([]byte(tt.val), resp.Kvs[0].Value) {
+			t.Errorf("#%d: val = %s, want %s", i, tt.val, resp.Kvs[0].Value)
+		}
+		if tt.leaseID != lease.LeaseID(resp.Kvs[0].Lease) {
+			t.Errorf("#%d: val = %d, want %d", i, tt.leaseID, resp.Kvs[0].Lease)
+		}
+	}
+}

+ 17 - 0
clientv3/integration/doc.go

@@ -0,0 +1,17 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package integration implements tests built upon embedded etcd, and focuses on
+// correctness of etcd client.
+package integration

+ 20 - 0
clientv3/integration/main_test.go

@@ -0,0 +1,20 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package integration
+
+import (
+	"os"
+	"testing"
+
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestMain(m *testing.M) {
+	v := m.Run()
+	if v == 0 && testutil.CheckLeakedGoroutine() {
+		os.Exit(1)
+	}
+	os.Exit(v)
+}

+ 674 - 0
integration/cluster.go

@@ -0,0 +1,674 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.package recipe
+package integration
+
+import (
+	"fmt"
+	"io/ioutil"
+	"math/rand"
+	"net"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"reflect"
+	"sort"
+	"strconv"
+	"strings"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+
+	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
+	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/pkg/transport"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/rafthttp"
+)
+
+const (
+	tickDuration   = 10 * time.Millisecond
+	clusterName    = "etcd"
+	requestTimeout = 20 * time.Second
+)
+
+var (
+	electionTicks = 10
+
+	// integration test uses well-known ports to listen for each running member,
+	// which ensures restarted member could listen on specific port again.
+	nextListenPort int64 = 20000
+)
+
+type ClusterConfig struct {
+	Size         int
+	UsePeerTLS   bool
+	DiscoveryURL string
+	UseV3        bool
+	UseGRPC      bool
+}
+
+type cluster struct {
+	cfg     *ClusterConfig
+	Members []*member
+}
+
+func (c *cluster) fillClusterForMembers() error {
+	if c.cfg.DiscoveryURL != "" {
+		// cluster will be discovered
+		return nil
+	}
+
+	addrs := make([]string, 0)
+	for _, m := range c.Members {
+		scheme := "http"
+		if !m.PeerTLSInfo.Empty() {
+			scheme = "https"
+		}
+		for _, l := range m.PeerListeners {
+			addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
+		}
+	}
+	clusterStr := strings.Join(addrs, ",")
+	var err error
+	for _, m := range c.Members {
+		m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
+	c := &cluster{cfg: cfg}
+	ms := make([]*member, cfg.Size)
+	for i := 0; i < cfg.Size; i++ {
+		ms[i] = c.mustNewMember(t)
+	}
+	c.Members = ms
+	if err := c.fillClusterForMembers(); err != nil {
+		t.Fatal(err)
+	}
+
+	return c
+}
+
+// NewCluster returns an unlaunched cluster of the given size which has been
+// set to use static bootstrap.
+func NewCluster(t *testing.T, size int) *cluster {
+	return newCluster(t, &ClusterConfig{Size: size})
+}
+
+// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
+func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
+	return newCluster(t, cfg)
+}
+
+func (c *cluster) Launch(t *testing.T) {
+	errc := make(chan error)
+	for _, m := range c.Members {
+		// Members are launched in separate goroutines because if they boot
+		// using discovery url, they have to wait for others to register to continue.
+		go func(m *member) {
+			errc <- m.Launch()
+		}(m)
+	}
+	for range c.Members {
+		if err := <-errc; err != nil {
+			t.Fatalf("error setting up member: %v", err)
+		}
+	}
+	// wait cluster to be stable to receive future client requests
+	c.waitMembersMatch(t, c.HTTPMembers())
+	c.waitVersion()
+}
+
+func (c *cluster) URL(i int) string {
+	return c.Members[i].ClientURLs[0].String()
+}
+
+func (c *cluster) URLs() []string {
+	urls := make([]string, 0)
+	for _, m := range c.Members {
+		for _, u := range m.ClientURLs {
+			urls = append(urls, u.String())
+		}
+	}
+	return urls
+}
+
+func (c *cluster) HTTPMembers() []client.Member {
+	ms := make([]client.Member, len(c.Members))
+	for i, m := range c.Members {
+		scheme := "http"
+		if !m.PeerTLSInfo.Empty() {
+			scheme = "https"
+		}
+		ms[i].Name = m.Name
+		for _, ln := range m.PeerListeners {
+			ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
+		}
+		for _, ln := range m.ClientListeners {
+			ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
+		}
+	}
+	return ms
+}
+
+func (c *cluster) mustNewMember(t *testing.T) *member {
+	name := c.name(rand.Int())
+	m := mustNewMember(t, name, c.cfg.UsePeerTLS)
+	m.DiscoveryURL = c.cfg.DiscoveryURL
+	m.V3demo = c.cfg.UseV3
+	if c.cfg.UseGRPC {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+	return m
+}
+
+func (c *cluster) addMember(t *testing.T) {
+	m := c.mustNewMember(t)
+
+	scheme := "http"
+	if c.cfg.UsePeerTLS {
+		scheme = "https"
+	}
+
+	// send add request to the cluster
+	cc := mustNewHTTPClient(t, []string{c.URL(0)})
+	ma := client.NewMembersAPI(cc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
+	if _, err := ma.Add(ctx, peerURL); err != nil {
+		t.Fatalf("add member on %s error: %v", c.URL(0), err)
+	}
+	cancel()
+
+	// wait for the add node entry applied in the cluster
+	members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
+	c.waitMembersMatch(t, members)
+
+	m.InitialPeerURLsMap = types.URLsMap{}
+	for _, mm := range c.Members {
+		m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
+	}
+	m.InitialPeerURLsMap[m.Name] = m.PeerURLs
+	m.NewCluster = false
+	if err := m.Launch(); err != nil {
+		t.Fatal(err)
+	}
+	c.Members = append(c.Members, m)
+	// wait cluster to be stable to receive future client requests
+	c.waitMembersMatch(t, c.HTTPMembers())
+}
+
+func (c *cluster) AddMember(t *testing.T) {
+	c.addMember(t)
+}
+
+func (c *cluster) RemoveMember(t *testing.T, id uint64) {
+	// send remove request to the cluster
+	cc := mustNewHTTPClient(t, c.URLs())
+	ma := client.NewMembersAPI(cc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
+		t.Fatalf("unexpected remove error %v", err)
+	}
+	cancel()
+	newMembers := make([]*member, 0)
+	for _, m := range c.Members {
+		if uint64(m.s.ID()) != id {
+			newMembers = append(newMembers, m)
+		} else {
+			select {
+			case <-m.s.StopNotify():
+				m.Terminate(t)
+			// 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
+			// TODO: remove connection write timeout by selecting on http response closeNotifier
+			// blocking on https://github.com/golang/go/issues/9524
+			case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
+				t.Fatalf("failed to remove member %s in time", m.s.ID())
+			}
+		}
+	}
+	c.Members = newMembers
+	c.waitMembersMatch(t, c.HTTPMembers())
+}
+
+func (c *cluster) Terminate(t *testing.T) {
+	for _, m := range c.Members {
+		m.Terminate(t)
+	}
+}
+
+func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
+	for _, u := range c.URLs() {
+		cc := mustNewHTTPClient(t, []string{u})
+		ma := client.NewMembersAPI(cc)
+		for {
+			ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+			ms, err := ma.List(ctx)
+			cancel()
+			if err == nil && isMembersEqual(ms, membs) {
+				break
+			}
+			time.Sleep(tickDuration)
+		}
+	}
+	return
+}
+
+func (c *cluster) waitLeader(t *testing.T, membs []*member) {
+	possibleLead := make(map[uint64]bool)
+	var lead uint64
+	for _, m := range membs {
+		possibleLead[uint64(m.s.ID())] = true
+	}
+
+	for lead == 0 || !possibleLead[lead] {
+		lead = 0
+		for _, m := range membs {
+			if lead != 0 && lead != m.s.Lead() {
+				lead = 0
+				break
+			}
+			lead = m.s.Lead()
+		}
+		time.Sleep(10 * tickDuration)
+	}
+}
+
+func (c *cluster) waitVersion() {
+	for _, m := range c.Members {
+		for {
+			if m.s.ClusterVersion() != nil {
+				break
+			}
+			time.Sleep(tickDuration)
+		}
+	}
+}
+
+func (c *cluster) name(i int) string {
+	return fmt.Sprint("node", i)
+}
+
+// isMembersEqual checks whether two members equal except ID field.
+// The given wmembs should always set ID field to empty string.
+func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
+	sort.Sort(SortableMemberSliceByPeerURLs(membs))
+	sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
+	for i := range membs {
+		membs[i].ID = ""
+	}
+	return reflect.DeepEqual(membs, wmembs)
+}
+
+func newLocalListener(t *testing.T) net.Listener {
+	port := atomic.AddInt64(&nextListenPort, 1)
+	l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
+	if err != nil {
+		t.Fatal(err)
+	}
+	return l
+}
+
+func newListenerWithAddr(t *testing.T, addr string) net.Listener {
+	var err error
+	var l net.Listener
+	// TODO: we want to reuse a previous closed port immediately.
+	// a better way is to set SO_REUSExx instead of doing retry.
+	for i := 0; i < 5; i++ {
+		l, err = net.Listen("tcp", addr)
+		if err == nil {
+			break
+		}
+		time.Sleep(500 * time.Millisecond)
+	}
+	if err != nil {
+		t.Fatal(err)
+	}
+	return l
+}
+
+type member struct {
+	etcdserver.ServerConfig
+	PeerListeners, ClientListeners []net.Listener
+	grpcListener                   net.Listener
+	// inited PeerTLSInfo implies to enable peer TLS
+	PeerTLSInfo transport.TLSInfo
+
+	raftHandler *testutil.PauseableHandler
+	s           *etcdserver.EtcdServer
+	hss         []*httptest.Server
+
+	grpcServer *grpc.Server
+	grpcAddr   string
+}
+
+// mustNewMember return an inited member with the given name. If usePeerTLS is
+// true, it will set PeerTLSInfo and use https scheme to communicate between
+// peers.
+func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
+	var (
+		testTLSInfo = transport.TLSInfo{
+			KeyFile:        "./fixtures/server.key.insecure",
+			CertFile:       "./fixtures/server.crt",
+			TrustedCAFile:  "./fixtures/ca.crt",
+			ClientCertAuth: true,
+		}
+		err error
+	)
+	m := &member{}
+
+	peerScheme := "http"
+	if usePeerTLS {
+		peerScheme = "https"
+	}
+
+	pln := newLocalListener(t)
+	m.PeerListeners = []net.Listener{pln}
+	m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if usePeerTLS {
+		m.PeerTLSInfo = testTLSInfo
+	}
+
+	cln := newLocalListener(t)
+	m.ClientListeners = []net.Listener{cln}
+	m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	m.Name = name
+
+	m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
+	if err != nil {
+		t.Fatal(err)
+	}
+	clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
+	m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
+	if err != nil {
+		t.Fatal(err)
+	}
+	m.InitialClusterToken = clusterName
+	m.NewCluster = true
+	m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo
+	m.ElectionTicks = electionTicks
+	m.TickMs = uint(tickDuration / time.Millisecond)
+	return m
+}
+
+// startGRPC starts a grpc server over a unix domain socket on the member
+func (m *member) listenGRPC() error {
+	if m.V3demo == false {
+		return fmt.Errorf("starting grpc server without v3 configured")
+	}
+	m.grpcAddr = m.Name + ".sock"
+	if err := os.RemoveAll(m.grpcAddr); err != nil {
+		return err
+	}
+	l, err := net.Listen("unix", m.grpcAddr)
+	if err != nil {
+		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
+	}
+	m.grpcListener = l
+	return nil
+}
+
+// NewClientV3 creates a new grpc client connection to the member
+func NewClientV3(m *member) (*clientv3.Client, error) {
+	if m.grpcAddr == "" {
+		return nil, fmt.Errorf("member not configured for grpc")
+	}
+	f := func(a string, t time.Duration) (net.Conn, error) {
+		return net.Dial("unix", a)
+	}
+	unixdialer := grpc.WithDialer(f)
+	conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
+	if err != nil {
+		return nil, err
+	}
+	return clientv3.NewFromConn(conn), nil
+}
+
+// Clone returns a member with the same server configuration. The returned
+// member will not set PeerListeners and ClientListeners.
+func (m *member) Clone(t *testing.T) *member {
+	mm := &member{}
+	mm.ServerConfig = m.ServerConfig
+
+	var err error
+	clientURLStrs := m.ClientURLs.StringSlice()
+	mm.ClientURLs, err = types.NewURLs(clientURLStrs)
+	if err != nil {
+		// this should never fail
+		panic(err)
+	}
+	peerURLStrs := m.PeerURLs.StringSlice()
+	mm.PeerURLs, err = types.NewURLs(peerURLStrs)
+	if err != nil {
+		// this should never fail
+		panic(err)
+	}
+	clusterStr := m.InitialPeerURLsMap.String()
+	mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
+	if err != nil {
+		// this should never fail
+		panic(err)
+	}
+	mm.InitialClusterToken = m.InitialClusterToken
+	mm.ElectionTicks = m.ElectionTicks
+	mm.PeerTLSInfo = m.PeerTLSInfo
+	return mm
+}
+
+// Launch starts a member based on ServerConfig, PeerListeners
+// and ClientListeners.
+func (m *member) Launch() error {
+	var err error
+	if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
+		return fmt.Errorf("failed to initialize the etcd server: %v", err)
+	}
+	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
+	m.s.Start()
+
+	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
+
+	for _, ln := range m.PeerListeners {
+		hs := &httptest.Server{
+			Listener: ln,
+			Config:   &http.Server{Handler: m.raftHandler},
+		}
+		if m.PeerTLSInfo.Empty() {
+			hs.Start()
+		} else {
+			hs.TLS, err = m.PeerTLSInfo.ServerConfig()
+			if err != nil {
+				return err
+			}
+			hs.StartTLS()
+		}
+		m.hss = append(m.hss, hs)
+	}
+	for _, ln := range m.ClientListeners {
+		hs := &httptest.Server{
+			Listener: ln,
+			Config:   &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
+		}
+		hs.Start()
+		m.hss = append(m.hss, hs)
+	}
+	if m.grpcListener != nil {
+		m.grpcServer = grpc.NewServer()
+		etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
+		etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
+		etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
+		go m.grpcServer.Serve(m.grpcListener)
+	}
+	return nil
+}
+
+func (m *member) WaitOK(t *testing.T) {
+	cc := mustNewHTTPClient(t, []string{m.URL()})
+	kapi := client.NewKeysAPI(cc)
+	for {
+		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+		_, err := kapi.Get(ctx, "/", nil)
+		if err != nil {
+			time.Sleep(tickDuration)
+			continue
+		}
+		cancel()
+		break
+	}
+	for m.s.Leader() == 0 {
+		time.Sleep(tickDuration)
+	}
+}
+
+func (m *member) URL() string { return m.ClientURLs[0].String() }
+
+func (m *member) Pause() {
+	m.raftHandler.Pause()
+	m.s.PauseSending()
+}
+
+func (m *member) Resume() {
+	m.raftHandler.Resume()
+	m.s.ResumeSending()
+}
+
+// Close stops the member's etcdserver and closes its connections
+func (m *member) Close() {
+	if m.grpcServer != nil {
+		m.grpcServer.Stop()
+		m.grpcServer = nil
+	}
+	m.s.Stop()
+	for _, hs := range m.hss {
+		hs.CloseClientConnections()
+		hs.Close()
+	}
+}
+
+// Stop stops the member, but the data dir of the member is preserved.
+func (m *member) Stop(t *testing.T) {
+	m.Close()
+	m.hss = nil
+}
+
+// Restart starts the member using the preserved data dir.
+func (m *member) Restart(t *testing.T) error {
+	newPeerListeners := make([]net.Listener, 0)
+	for _, ln := range m.PeerListeners {
+		newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
+	}
+	m.PeerListeners = newPeerListeners
+	newClientListeners := make([]net.Listener, 0)
+	for _, ln := range m.ClientListeners {
+		newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
+	}
+	m.ClientListeners = newClientListeners
+
+	if m.grpcListener != nil {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	return m.Launch()
+}
+
+// Terminate stops the member and removes the data dir.
+func (m *member) Terminate(t *testing.T) {
+	m.Close()
+	if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
+	cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
+	c, err := client.New(cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return c
+}
+
+func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
+	// tick in integration test is short, so 1s dial timeout could play well.
+	tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return tr
+}
+
+type SortableMemberSliceByPeerURLs []client.Member
+
+func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
+func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
+	return p[i].PeerURLs[0] < p[j].PeerURLs[0]
+}
+func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+
+type ClusterV3 struct {
+	*cluster
+	clients []*clientv3.Client
+}
+
+// NewClusterV3 returns a launched cluster with a grpc client connection
+// for each cluster member.
+func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
+	cfg.UseV3 = true
+	cfg.UseGRPC = true
+	clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
+	for _, m := range clus.Members {
+		client, err := NewClientV3(m)
+		if err != nil {
+			t.Fatal(err)
+		}
+		clus.clients = append(clus.clients, client)
+	}
+	clus.Launch(t)
+	return clus
+}
+
+func (c *ClusterV3) Terminate(t *testing.T) {
+	for _, client := range c.clients {
+		if err := client.Close(); err != nil {
+			t.Error(err)
+		}
+	}
+	c.cluster.Terminate(t)
+}
+
+func (c *ClusterV3) RandClient() *clientv3.Client {
+	return c.clients[rand.Intn(len(c.clients))]
+}

+ 7 - 619
integration/cluster_test.go

@@ -16,48 +16,16 @@ package integration
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"io/ioutil"
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
-	"net"
-	"net/http"
-	"net/http/httptest"
 	"os"
 	"os"
-	"reflect"
-	"sort"
 	"strconv"
 	"strconv"
-	"strings"
-	"sync/atomic"
 	"testing"
 	"testing"
-	"time"
 
 
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/client"
-	"github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/etcdserver/api/v3rpc"
-	"github.com/coreos/etcd/etcdserver/etcdhttp"
-	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
-	"github.com/coreos/etcd/pkg/transport"
-	"github.com/coreos/etcd/pkg/types"
-	"github.com/coreos/etcd/rafthttp"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
-)
-
-const (
-	tickDuration   = 10 * time.Millisecond
-	clusterName    = "etcd"
-	requestTimeout = 20 * time.Second
-)
-
-var (
-	electionTicks = 10
-
-	// integration test uses well-known ports to listen for each running member,
-	// which ensures restarted member could listen on specific port again.
-	nextListenPort int64 = 20000
 )
 )
 
 
 func init() {
 func init() {
@@ -83,7 +51,7 @@ func testCluster(t *testing.T, size int) {
 
 
 func TestTLSClusterOf3(t *testing.T) {
 func TestTLSClusterOf3(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
+	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 	clusterMustProgress(t, c.Members)
 	clusterMustProgress(t, c.Members)
@@ -108,7 +76,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
 
 
 	c := NewClusterByConfig(
 	c := NewClusterByConfig(
 		t,
 		t,
-		&clusterConfig{size: size, discoveryURL: dc.URL(0) + "/v2/keys"},
+		&ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
 	)
 	)
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
@@ -130,10 +98,10 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
 	cancel()
 	cancel()
 
 
 	c := NewClusterByConfig(t,
 	c := NewClusterByConfig(t,
-		&clusterConfig{
-			size:         3,
-			usePeerTLS:   true,
-			discoveryURL: dc.URL(0) + "/v2/keys"},
+		&ClusterConfig{
+			Size:         3,
+			UsePeerTLS:   true,
+			DiscoveryURL: dc.URL(0) + "/v2/keys"},
 	)
 	)
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
@@ -157,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
 
 
 func TestDoubleTLSClusterSizeOf3(t *testing.T) {
 func TestDoubleTLSClusterSizeOf3(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
+	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 
 
@@ -347,583 +315,3 @@ func clusterMustProgress(t *testing.T, membs []*member) {
 		mcancel()
 		mcancel()
 	}
 	}
 }
 }
-
-type clusterConfig struct {
-	size         int
-	usePeerTLS   bool
-	discoveryURL string
-	useV3        bool
-	useGRPC      bool
-}
-
-type cluster struct {
-	cfg     *clusterConfig
-	Members []*member
-}
-
-func (c *cluster) fillClusterForMembers() error {
-	if c.cfg.discoveryURL != "" {
-		// cluster will be discovered
-		return nil
-	}
-
-	addrs := make([]string, 0)
-	for _, m := range c.Members {
-		scheme := "http"
-		if !m.PeerTLSInfo.Empty() {
-			scheme = "https"
-		}
-		for _, l := range m.PeerListeners {
-			addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
-		}
-	}
-	clusterStr := strings.Join(addrs, ",")
-	var err error
-	for _, m := range c.Members {
-		m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func newCluster(t *testing.T, cfg *clusterConfig) *cluster {
-	c := &cluster{cfg: cfg}
-	ms := make([]*member, cfg.size)
-	for i := 0; i < cfg.size; i++ {
-		ms[i] = c.mustNewMember(t)
-	}
-	c.Members = ms
-	if err := c.fillClusterForMembers(); err != nil {
-		t.Fatal(err)
-	}
-
-	return c
-}
-
-// NewCluster returns an unlaunched cluster of the given size which has been
-// set to use static bootstrap.
-func NewCluster(t *testing.T, size int) *cluster {
-	return newCluster(t, &clusterConfig{size: size})
-}
-
-// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
-func NewClusterByConfig(t *testing.T, cfg *clusterConfig) *cluster {
-	return newCluster(t, cfg)
-}
-
-func (c *cluster) Launch(t *testing.T) {
-	errc := make(chan error)
-	for _, m := range c.Members {
-		// Members are launched in separate goroutines because if they boot
-		// using discovery url, they have to wait for others to register to continue.
-		go func(m *member) {
-			errc <- m.Launch()
-		}(m)
-	}
-	for range c.Members {
-		if err := <-errc; err != nil {
-			t.Fatalf("error setting up member: %v", err)
-		}
-	}
-	// wait cluster to be stable to receive future client requests
-	c.waitMembersMatch(t, c.HTTPMembers())
-	c.waitVersion()
-}
-
-func (c *cluster) URL(i int) string {
-	return c.Members[i].ClientURLs[0].String()
-}
-
-func (c *cluster) URLs() []string {
-	urls := make([]string, 0)
-	for _, m := range c.Members {
-		for _, u := range m.ClientURLs {
-			urls = append(urls, u.String())
-		}
-	}
-	return urls
-}
-
-func (c *cluster) HTTPMembers() []client.Member {
-	ms := make([]client.Member, len(c.Members))
-	for i, m := range c.Members {
-		scheme := "http"
-		if !m.PeerTLSInfo.Empty() {
-			scheme = "https"
-		}
-		ms[i].Name = m.Name
-		for _, ln := range m.PeerListeners {
-			ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
-		}
-		for _, ln := range m.ClientListeners {
-			ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
-		}
-	}
-	return ms
-}
-
-func (c *cluster) mustNewMember(t *testing.T) *member {
-	name := c.name(rand.Int())
-	m := mustNewMember(t, name, c.cfg.usePeerTLS)
-	m.DiscoveryURL = c.cfg.discoveryURL
-	m.V3demo = c.cfg.useV3
-	if c.cfg.useGRPC {
-		if err := m.listenGRPC(); err != nil {
-			t.Fatal(err)
-		}
-	}
-	return m
-}
-
-func (c *cluster) addMember(t *testing.T) {
-	m := c.mustNewMember(t)
-
-	scheme := "http"
-	if c.cfg.usePeerTLS {
-		scheme = "https"
-	}
-
-	// send add request to the cluster
-	cc := mustNewHTTPClient(t, []string{c.URL(0)})
-	ma := client.NewMembersAPI(cc)
-	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-	peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
-	if _, err := ma.Add(ctx, peerURL); err != nil {
-		t.Fatalf("add member on %s error: %v", c.URL(0), err)
-	}
-	cancel()
-
-	// wait for the add node entry applied in the cluster
-	members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
-	c.waitMembersMatch(t, members)
-
-	m.InitialPeerURLsMap = types.URLsMap{}
-	for _, mm := range c.Members {
-		m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
-	}
-	m.InitialPeerURLsMap[m.Name] = m.PeerURLs
-	m.NewCluster = false
-	if err := m.Launch(); err != nil {
-		t.Fatal(err)
-	}
-	c.Members = append(c.Members, m)
-	// wait cluster to be stable to receive future client requests
-	c.waitMembersMatch(t, c.HTTPMembers())
-}
-
-func (c *cluster) AddMember(t *testing.T) {
-	c.addMember(t)
-}
-
-func (c *cluster) RemoveMember(t *testing.T, id uint64) {
-	// send remove request to the cluster
-	cc := mustNewHTTPClient(t, c.URLs())
-	ma := client.NewMembersAPI(cc)
-	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-	if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
-		t.Fatalf("unexpected remove error %v", err)
-	}
-	cancel()
-	newMembers := make([]*member, 0)
-	for _, m := range c.Members {
-		if uint64(m.s.ID()) != id {
-			newMembers = append(newMembers, m)
-		} else {
-			select {
-			case <-m.s.StopNotify():
-				m.Terminate(t)
-			// 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
-			// TODO: remove connection write timeout by selecting on http response closeNotifier
-			// blocking on https://github.com/golang/go/issues/9524
-			case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
-				t.Fatalf("failed to remove member %s in time", m.s.ID())
-			}
-		}
-	}
-	c.Members = newMembers
-	c.waitMembersMatch(t, c.HTTPMembers())
-}
-
-func (c *cluster) Terminate(t *testing.T) {
-	for _, m := range c.Members {
-		m.Terminate(t)
-	}
-}
-
-func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
-	for _, u := range c.URLs() {
-		cc := mustNewHTTPClient(t, []string{u})
-		ma := client.NewMembersAPI(cc)
-		for {
-			ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-			ms, err := ma.List(ctx)
-			cancel()
-			if err == nil && isMembersEqual(ms, membs) {
-				break
-			}
-			time.Sleep(tickDuration)
-		}
-	}
-	return
-}
-
-func (c *cluster) waitLeader(t *testing.T, membs []*member) {
-	possibleLead := make(map[uint64]bool)
-	var lead uint64
-	for _, m := range membs {
-		possibleLead[uint64(m.s.ID())] = true
-	}
-
-	for lead == 0 || !possibleLead[lead] {
-		lead = 0
-		for _, m := range membs {
-			if lead != 0 && lead != m.s.Lead() {
-				lead = 0
-				break
-			}
-			lead = m.s.Lead()
-		}
-		time.Sleep(10 * tickDuration)
-	}
-}
-
-func (c *cluster) waitVersion() {
-	for _, m := range c.Members {
-		for {
-			if m.s.ClusterVersion() != nil {
-				break
-			}
-			time.Sleep(tickDuration)
-		}
-	}
-}
-
-func (c *cluster) name(i int) string {
-	return fmt.Sprint("node", i)
-}
-
-// isMembersEqual checks whether two members equal except ID field.
-// The given wmembs should always set ID field to empty string.
-func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
-	sort.Sort(SortableMemberSliceByPeerURLs(membs))
-	sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
-	for i := range membs {
-		membs[i].ID = ""
-	}
-	return reflect.DeepEqual(membs, wmembs)
-}
-
-func newLocalListener(t *testing.T) net.Listener {
-	port := atomic.AddInt64(&nextListenPort, 1)
-	l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
-	if err != nil {
-		t.Fatal(err)
-	}
-	return l
-}
-
-func newListenerWithAddr(t *testing.T, addr string) net.Listener {
-	var err error
-	var l net.Listener
-	// TODO: we want to reuse a previous closed port immediately.
-	// a better way is to set SO_REUSExx instead of doing retry.
-	for i := 0; i < 5; i++ {
-		l, err = net.Listen("tcp", addr)
-		if err == nil {
-			break
-		}
-		time.Sleep(500 * time.Millisecond)
-	}
-	if err != nil {
-		t.Fatal(err)
-	}
-	return l
-}
-
-type member struct {
-	etcdserver.ServerConfig
-	PeerListeners, ClientListeners []net.Listener
-	grpcListener                   net.Listener
-	// inited PeerTLSInfo implies to enable peer TLS
-	PeerTLSInfo transport.TLSInfo
-
-	raftHandler *testutil.PauseableHandler
-	s           *etcdserver.EtcdServer
-	hss         []*httptest.Server
-
-	grpcServer *grpc.Server
-	grpcAddr   string
-}
-
-// mustNewMember return an inited member with the given name. If usePeerTLS is
-// true, it will set PeerTLSInfo and use https scheme to communicate between
-// peers.
-func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
-	var (
-		testTLSInfo = transport.TLSInfo{
-			KeyFile:        "./fixtures/server.key.insecure",
-			CertFile:       "./fixtures/server.crt",
-			TrustedCAFile:  "./fixtures/ca.crt",
-			ClientCertAuth: true,
-		}
-		err error
-	)
-	m := &member{}
-
-	peerScheme := "http"
-	if usePeerTLS {
-		peerScheme = "https"
-	}
-
-	pln := newLocalListener(t)
-	m.PeerListeners = []net.Listener{pln}
-	m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
-	if err != nil {
-		t.Fatal(err)
-	}
-	if usePeerTLS {
-		m.PeerTLSInfo = testTLSInfo
-	}
-
-	cln := newLocalListener(t)
-	m.ClientListeners = []net.Listener{cln}
-	m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	m.Name = name
-
-	m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
-	if err != nil {
-		t.Fatal(err)
-	}
-	clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
-	m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
-	if err != nil {
-		t.Fatal(err)
-	}
-	m.InitialClusterToken = clusterName
-	m.NewCluster = true
-	m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo
-	m.ElectionTicks = electionTicks
-	m.TickMs = uint(tickDuration / time.Millisecond)
-	return m
-}
-
-// startGRPC starts a grpc server over a unix domain socket on the member
-func (m *member) listenGRPC() error {
-	if m.V3demo == false {
-		return fmt.Errorf("starting grpc server without v3 configured")
-	}
-	m.grpcAddr = m.Name + ".sock"
-	if err := os.RemoveAll(m.grpcAddr); err != nil {
-		return err
-	}
-	l, err := net.Listen("unix", m.grpcAddr)
-	if err != nil {
-		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
-	}
-	m.grpcListener = l
-	return nil
-}
-
-// NewClientV3 creates a new grpc client connection to the member
-func NewClientV3(m *member) (*clientv3.Client, error) {
-	if m.grpcAddr == "" {
-		return nil, fmt.Errorf("member not configured for grpc")
-	}
-	f := func(a string, t time.Duration) (net.Conn, error) {
-		return net.Dial("unix", a)
-	}
-	unixdialer := grpc.WithDialer(f)
-	conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
-	if err != nil {
-		return nil, err
-	}
-	return clientv3.NewFromConn(conn), nil
-}
-
-// Clone returns a member with the same server configuration. The returned
-// member will not set PeerListeners and ClientListeners.
-func (m *member) Clone(t *testing.T) *member {
-	mm := &member{}
-	mm.ServerConfig = m.ServerConfig
-
-	var err error
-	clientURLStrs := m.ClientURLs.StringSlice()
-	mm.ClientURLs, err = types.NewURLs(clientURLStrs)
-	if err != nil {
-		// this should never fail
-		panic(err)
-	}
-	peerURLStrs := m.PeerURLs.StringSlice()
-	mm.PeerURLs, err = types.NewURLs(peerURLStrs)
-	if err != nil {
-		// this should never fail
-		panic(err)
-	}
-	clusterStr := m.InitialPeerURLsMap.String()
-	mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
-	if err != nil {
-		// this should never fail
-		panic(err)
-	}
-	mm.InitialClusterToken = m.InitialClusterToken
-	mm.ElectionTicks = m.ElectionTicks
-	mm.PeerTLSInfo = m.PeerTLSInfo
-	return mm
-}
-
-// Launch starts a member based on ServerConfig, PeerListeners
-// and ClientListeners.
-func (m *member) Launch() error {
-	var err error
-	if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
-		return fmt.Errorf("failed to initialize the etcd server: %v", err)
-	}
-	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
-	m.s.Start()
-
-	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
-
-	for _, ln := range m.PeerListeners {
-		hs := &httptest.Server{
-			Listener: ln,
-			Config:   &http.Server{Handler: m.raftHandler},
-		}
-		if m.PeerTLSInfo.Empty() {
-			hs.Start()
-		} else {
-			hs.TLS, err = m.PeerTLSInfo.ServerConfig()
-			if err != nil {
-				return err
-			}
-			hs.StartTLS()
-		}
-		m.hss = append(m.hss, hs)
-	}
-	for _, ln := range m.ClientListeners {
-		hs := &httptest.Server{
-			Listener: ln,
-			Config:   &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
-		}
-		hs.Start()
-		m.hss = append(m.hss, hs)
-	}
-	if m.grpcListener != nil {
-		m.grpcServer = grpc.NewServer()
-		etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
-		etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
-		etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
-		go m.grpcServer.Serve(m.grpcListener)
-	}
-	return nil
-}
-
-func (m *member) WaitOK(t *testing.T) {
-	cc := mustNewHTTPClient(t, []string{m.URL()})
-	kapi := client.NewKeysAPI(cc)
-	for {
-		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-		_, err := kapi.Get(ctx, "/", nil)
-		if err != nil {
-			time.Sleep(tickDuration)
-			continue
-		}
-		cancel()
-		break
-	}
-	for m.s.Leader() == 0 {
-		time.Sleep(tickDuration)
-	}
-}
-
-func (m *member) URL() string { return m.ClientURLs[0].String() }
-
-func (m *member) Pause() {
-	m.raftHandler.Pause()
-	m.s.PauseSending()
-}
-
-func (m *member) Resume() {
-	m.raftHandler.Resume()
-	m.s.ResumeSending()
-}
-
-// Close stops the member's etcdserver and closes its connections
-func (m *member) Close() {
-	if m.grpcServer != nil {
-		m.grpcServer.Stop()
-		m.grpcServer = nil
-	}
-	m.s.Stop()
-	for _, hs := range m.hss {
-		hs.CloseClientConnections()
-		hs.Close()
-	}
-}
-
-// Stop stops the member, but the data dir of the member is preserved.
-func (m *member) Stop(t *testing.T) {
-	m.Close()
-	m.hss = nil
-}
-
-// Restart starts the member using the preserved data dir.
-func (m *member) Restart(t *testing.T) error {
-	newPeerListeners := make([]net.Listener, 0)
-	for _, ln := range m.PeerListeners {
-		newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
-	}
-	m.PeerListeners = newPeerListeners
-	newClientListeners := make([]net.Listener, 0)
-	for _, ln := range m.ClientListeners {
-		newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
-	}
-	m.ClientListeners = newClientListeners
-
-	if m.grpcListener != nil {
-		if err := m.listenGRPC(); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	return m.Launch()
-}
-
-// Terminate stops the member and removes the data dir.
-func (m *member) Terminate(t *testing.T) {
-	m.Close()
-	if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
-		t.Fatal(err)
-	}
-}
-
-func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
-	cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
-	c, err := client.New(cfg)
-	if err != nil {
-		t.Fatal(err)
-	}
-	return c
-}
-
-func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
-	// tick in integration test is short, so 1s dial timeout could play well.
-	tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
-	if err != nil {
-		t.Fatal(err)
-	}
-	return tr
-}
-
-type SortableMemberSliceByPeerURLs []client.Member
-
-func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
-func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
-	return p[i].PeerURLs[0] < p[j].PeerURLs[0]
-}
-func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

+ 2 - 2
integration/v3_barrier_test.go

@@ -24,14 +24,14 @@ import (
 
 
 func TestBarrierSingleNode(t *testing.T) {
 func TestBarrierSingleNode(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
 	testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 }
 
 
 func TestBarrierMultiNode(t *testing.T) {
 func TestBarrierMultiNode(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() })
 	testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 }

+ 3 - 3
integration/v3_double_barrier_test.go

@@ -21,7 +21,7 @@ import (
 )
 )
 
 
 func TestDoubleBarrier(t *testing.T) {
 func TestDoubleBarrier(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	defer closeSessionLease(clus)
 	defer closeSessionLease(clus)
 
 
@@ -82,7 +82,7 @@ func TestDoubleBarrier(t *testing.T) {
 }
 }
 
 
 func TestDoubleBarrierFailover(t *testing.T) {
 func TestDoubleBarrierFailover(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	defer closeSessionLease(clus)
 	defer closeSessionLease(clus)
 
 
@@ -122,7 +122,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func closeSessionLease(clus *clusterV3) {
+func closeSessionLease(clus *ClusterV3) {
 	for _, client := range clus.clients {
 	for _, client := range clus.clients {
 		recipe.StopSessionLease(client)
 		recipe.StopSessionLease(client)
 	}
 	}

+ 2 - 2
integration/v3_election_test.go

@@ -23,7 +23,7 @@ import (
 
 
 // TestElectionWait tests if followers can correcty wait for elections.
 // TestElectionWait tests if followers can correcty wait for elections.
 func TestElectionWait(t *testing.T) {
 func TestElectionWait(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	defer closeSessionLease(clus)
 	defer closeSessionLease(clus)
 
 
@@ -86,7 +86,7 @@ func TestElectionWait(t *testing.T) {
 
 
 // TestElectionFailover tests that an election will
 // TestElectionFailover tests that an election will
 func TestElectionFailover(t *testing.T) {
 func TestElectionFailover(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	defer closeSessionLease(clus)
 	defer closeSessionLease(clus)
 
 

+ 20 - 57
integration/v3_grpc_test.go

@@ -16,7 +16,6 @@ package integration
 import (
 import (
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
-	"math/rand"
 	"reflect"
 	"reflect"
 	"sort"
 	"sort"
 	"sync"
 	"sync"
@@ -24,7 +23,6 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
@@ -32,46 +30,11 @@ import (
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
 
 
-type clusterV3 struct {
-	*cluster
-	clients []*clientv3.Client
-}
-
-// newClusterV3 returns a launched cluster with a grpc client connection
-// for each cluster member.
-func newClusterV3(t *testing.T, cfg *clusterConfig) *clusterV3 {
-	cfg.useV3 = true
-	cfg.useGRPC = true
-	clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)}
-	for _, m := range clus.Members {
-		client, err := NewClientV3(m)
-		if err != nil {
-			t.Fatal(err)
-		}
-		clus.clients = append(clus.clients, client)
-	}
-	clus.Launch(t)
-	return clus
-}
-
-func (c *clusterV3) Terminate(t *testing.T) {
-	for _, client := range c.clients {
-		if err := client.Close(); err != nil {
-			t.Error(err)
-		}
-	}
-	c.cluster.Terminate(t)
-}
-
-func (c *clusterV3) RandClient() *clientv3.Client {
-	return c.clients[rand.Intn(len(c.clients))]
-}
-
 // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
 // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
 // overwrites it, then checks that the change was applied.
 // overwrites it, then checks that the change was applied.
 func TestV3PutOverwrite(t *testing.T) {
 func TestV3PutOverwrite(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
@@ -115,7 +78,7 @@ func TestV3PutOverwrite(t *testing.T) {
 
 
 func TestV3TxnTooManyOps(t *testing.T) {
 func TestV3TxnTooManyOps(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
@@ -173,7 +136,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
 // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
 // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
 func TestV3PutMissingLease(t *testing.T) {
 func TestV3PutMissingLease(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
@@ -290,7 +253,7 @@ func TestV3DeleteRange(t *testing.T) {
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		clus := newClusterV3(t, &clusterConfig{size: 3})
+		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 		kvc := clus.RandClient().KV
 		kvc := clus.RandClient().KV
 
 
 		ks := tt.keySet
 		ks := tt.keySet
@@ -336,7 +299,7 @@ func TestV3DeleteRange(t *testing.T) {
 // TestV3TxnInvaildRange tests txn
 // TestV3TxnInvaildRange tests txn
 func TestV3TxnInvaildRange(t *testing.T) {
 func TestV3TxnInvaildRange(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
@@ -553,7 +516,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		clus := newClusterV3(t, &clusterConfig{size: 3})
+		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 
 
 		wAPI := clus.RandClient().Watch
 		wAPI := clus.RandClient().Watch
 		ctx, cancel := context.WithCancel(context.Background())
 		ctx, cancel := context.WithCancel(context.Background())
@@ -629,7 +592,7 @@ func TestV3WatchCancelUnsynced(t *testing.T) {
 }
 }
 
 
 func testV3WatchCancel(t *testing.T, startRev int64) {
 func testV3WatchCancel(t *testing.T, startRev int64) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	defer cancel()
@@ -697,7 +660,7 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
 // that matches all watchers, and another key that matches only
 // that matches all watchers, and another key that matches only
 // one watcher to test if it receives expected events.
 // one watcher to test if it receives expected events.
 func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
 func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
@@ -799,7 +762,7 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
 
 
 // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
 // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
 func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	defer cancel()
@@ -882,7 +845,7 @@ func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.
 
 
 func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
@@ -971,7 +934,7 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
 
 
 // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
 // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
 func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
 func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	wAPI := clus.RandClient().Watch
 	wAPI := clus.RandClient().Watch
 	kvc := clus.RandClient().KV
 	kvc := clus.RandClient().KV
 
 
@@ -1195,7 +1158,7 @@ func TestV3RangeRequest(t *testing.T) {
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		clus := newClusterV3(t, &clusterConfig{size: 3})
+		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 		for _, k := range tt.putKeys {
 		for _, k := range tt.putKeys {
 			kvc := clus.RandClient().KV
 			kvc := clus.RandClient().KV
 			req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
 			req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
@@ -1239,7 +1202,7 @@ func TestV3RangeRequest(t *testing.T) {
 // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
 // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
 func TestV3LeaseRevoke(t *testing.T) {
 func TestV3LeaseRevoke(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
 		lc := clus.RandClient().Lease
 		lc := clus.RandClient().Lease
 		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
 		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
 		return err
 		return err
@@ -1249,7 +1212,7 @@ func TestV3LeaseRevoke(t *testing.T) {
 // TestV3LeaseCreateById ensures leases may be created by a given id.
 // TestV3LeaseCreateById ensures leases may be created by a given id.
 func TestV3LeaseCreateByID(t *testing.T) {
 func TestV3LeaseCreateByID(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	// create fixed lease
 	// create fixed lease
@@ -1290,7 +1253,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
 // TestV3LeaseExpire ensures a key is deleted once a key expires.
 // TestV3LeaseExpire ensures a key is deleted once a key expires.
 func TestV3LeaseExpire(t *testing.T) {
 func TestV3LeaseExpire(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
 		// let lease lapse; wait for deleted key
 		// let lease lapse; wait for deleted key
 
 
 		ctx, cancel := context.WithCancel(context.Background())
 		ctx, cancel := context.WithCancel(context.Background())
@@ -1342,7 +1305,7 @@ func TestV3LeaseExpire(t *testing.T) {
 // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
 // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
 func TestV3LeaseKeepAlive(t *testing.T) {
 func TestV3LeaseKeepAlive(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
 		lc := clus.RandClient().Lease
 		lc := clus.RandClient().Lease
 		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
 		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
 		ctx, cancel := context.WithCancel(context.Background())
 		ctx, cancel := context.WithCancel(context.Background())
@@ -1376,7 +1339,7 @@ func TestV3LeaseKeepAlive(t *testing.T) {
 // client to confirm it's visible to the whole cluster.
 // client to confirm it's visible to the whole cluster.
 func TestV3LeaseExists(t *testing.T) {
 func TestV3LeaseExists(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	// create lease
 	// create lease
@@ -1409,7 +1372,7 @@ func TestV3LeaseExists(t *testing.T) {
 }
 }
 
 
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 // acquireLeaseAndKey creates a new lease and creates an attached key.
-func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
+func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
 	// create lease
 	// create lease
 	lresp, err := clus.RandClient().Lease.LeaseCreate(
 	lresp, err := clus.RandClient().Lease.LeaseCreate(
 		context.TODO(),
 		context.TODO(),
@@ -1430,8 +1393,8 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 
 
 // testLeaseRemoveLeasedKey performs some action while holding a lease with an
 // testLeaseRemoveLeasedKey performs some action while holding a lease with an
 // attached key "foo", then confirms the key is gone.
 // attached key "foo", then confirms the key is gone.
-func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	leaseID, err := acquireLeaseAndKey(clus, "foo")
 	leaseID, err := acquireLeaseAndKey(clus, "foo")

+ 5 - 5
integration/v3_lock_test.go

@@ -23,13 +23,13 @@ import (
 )
 )
 
 
 func TestMutexSingleNode(t *testing.T) {
 func TestMutexSingleNode(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 	testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 }
 
 
 func TestMutexMultiNode(t *testing.T) {
 func TestMutexMultiNode(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 	testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 }
@@ -68,7 +68,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
 
 
 func BenchmarkMutex4Waiters(b *testing.B) {
 func BenchmarkMutex4Waiters(b *testing.B) {
 	// XXX switch tests to use TB interface
 	// XXX switch tests to use TB interface
-	clus := newClusterV3(nil, &clusterConfig{size: 3})
+	clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
 	defer clus.Terminate(nil)
 	defer clus.Terminate(nil)
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
 		testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
 		testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
@@ -76,13 +76,13 @@ func BenchmarkMutex4Waiters(b *testing.B) {
 }
 }
 
 
 func TestRWMutexSingleNode(t *testing.T) {
 func TestRWMutexSingleNode(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 	testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }
 }
 
 
 func TestRWMutexMultiNode(t *testing.T) {
 func TestRWMutexMultiNode(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 	testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
 }
 }

+ 8 - 8
integration/v3_queue_test.go

@@ -29,7 +29,7 @@ const (
 
 
 // TestQueueOneReaderOneWriter confirms the queue is FIFO
 // TestQueueOneReaderOneWriter confirms the queue is FIFO
 func TestQueueOneReaderOneWriter(t *testing.T) {
 func TestQueueOneReaderOneWriter(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 1})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	done := make(chan struct{})
 	done := make(chan struct{})
@@ -75,7 +75,7 @@ func TestQueueManyReaderManyWriter(t *testing.T) {
 // BenchmarkQueue benchmarks Queues using many/many readers/writers
 // BenchmarkQueue benchmarks Queues using many/many readers/writers
 func BenchmarkQueue(b *testing.B) {
 func BenchmarkQueue(b *testing.B) {
 	// XXX switch tests to use TB interface
 	// XXX switch tests to use TB interface
-	clus := newClusterV3(nil, &clusterConfig{size: 3})
+	clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
 	defer clus.Terminate(nil)
 	defer clus.Terminate(nil)
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
 		testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
 		testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
@@ -84,7 +84,7 @@ func BenchmarkQueue(b *testing.B) {
 
 
 // TestPrQueue tests whether priority queues respect priorities.
 // TestPrQueue tests whether priority queues respect priorities.
 func TestPrQueueOneReaderOneWriter(t *testing.T) {
 func TestPrQueueOneReaderOneWriter(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 1})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	// write out five items with random priority
 	// write out five items with random priority
@@ -116,7 +116,7 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
 }
 }
 
 
 func TestPrQueueManyReaderManyWriter(t *testing.T) {
 func TestPrQueueManyReaderManyWriter(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	rqs := newPriorityQueues(clus, manyQueueClients)
 	rqs := newPriorityQueues(clus, manyQueueClients)
 	wqs := newPriorityQueues(clus, manyQueueClients)
 	wqs := newPriorityQueues(clus, manyQueueClients)
@@ -126,7 +126,7 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) {
 // BenchmarkQueue benchmarks Queues using n/n readers/writers
 // BenchmarkQueue benchmarks Queues using n/n readers/writers
 func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
 func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
 	// XXX switch tests to use TB interface
 	// XXX switch tests to use TB interface
-	clus := newClusterV3(nil, &clusterConfig{size: 3})
+	clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
 	defer clus.Terminate(nil)
 	defer clus.Terminate(nil)
 	rqs := newPriorityQueues(clus, 1)
 	rqs := newPriorityQueues(clus, 1)
 	wqs := newPriorityQueues(clus, 1)
 	wqs := newPriorityQueues(clus, 1)
@@ -136,12 +136,12 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
 }
 }
 
 
 func testQueueNReaderMWriter(t *testing.T, n int, m int) {
 func testQueueNReaderMWriter(t *testing.T, n int, m int) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
 	testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
 }
 }
 
 
-func newQueues(clus *clusterV3, n int) (qs []testQueue) {
+func newQueues(clus *ClusterV3, n int) (qs []testQueue) {
 	for i := 0; i < n; i++ {
 	for i := 0; i < n; i++ {
 		etcdc := clus.RandClient()
 		etcdc := clus.RandClient()
 		qs = append(qs, recipe.NewQueue(etcdc, "q"))
 		qs = append(qs, recipe.NewQueue(etcdc, "q"))
@@ -149,7 +149,7 @@ func newQueues(clus *clusterV3, n int) (qs []testQueue) {
 	return qs
 	return qs
 }
 }
 
 
-func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) {
+func newPriorityQueues(clus *ClusterV3, n int) (qs []testQueue) {
 	for i := 0; i < n; i++ {
 	for i := 0; i < n; i++ {
 		etcdc := clus.RandClient()
 		etcdc := clus.RandClient()
 		q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
 		q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}

+ 3 - 3
integration/v3_stm_test.go

@@ -24,7 +24,7 @@ import (
 
 
 // TestSTMConflict tests that conflicts are retried.
 // TestSTMConflict tests that conflicts are retried.
 func TestSTMConflict(t *testing.T) {
 func TestSTMConflict(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	etcdc := clus.RandClient()
 	etcdc := clus.RandClient()
@@ -89,7 +89,7 @@ func TestSTMConflict(t *testing.T) {
 
 
 // TestSTMPut confirms a STM put on a new key is visible after commit.
 // TestSTMPut confirms a STM put on a new key is visible after commit.
 func TestSTMPutNewKey(t *testing.T) {
 func TestSTMPutNewKey(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 1})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	etcdc := clus.RandClient()
 	etcdc := clus.RandClient()
@@ -113,7 +113,7 @@ func TestSTMPutNewKey(t *testing.T) {
 
 
 // TestSTMAbort tests that an aborted txn does not modify any keys.
 // TestSTMAbort tests that an aborted txn does not modify any keys.
 func TestSTMAbort(t *testing.T) {
 func TestSTMAbort(t *testing.T) {
-	clus := newClusterV3(t, &clusterConfig{size: 1})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	etcdc := clus.RandClient()
 	etcdc := clus.RandClient()

+ 2 - 1
test

@@ -20,7 +20,7 @@ TESTABLE_AND_FORMATTABLE="client clientv3 discovery error etcdctl/command etcdma
 # TODO: add it to race testing when the issue is resolved
 # TODO: add it to race testing when the issue is resolved
 # https://github.com/golang/go/issues/9946
 # https://github.com/golang/go/issues/9946
 NO_RACE_TESTABLE="rafthttp"
 NO_RACE_TESTABLE="rafthttp"
-FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration e2e"
+FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration clientv3/integration e2e"
 
 
 # user has not provided PKG override
 # user has not provided PKG override
 if [ -z "$PKG" ]; then
 if [ -z "$PKG" ]; then
@@ -60,6 +60,7 @@ function integration_tests {
 	echo "Running integration tests..."
 	echo "Running integration tests..."
 	go test -timeout 5m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e
 	go test -timeout 5m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e
 	go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration
 	go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration
+	go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
 	go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
 	go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
 }
 }