Browse Source

Merge pull request #8277 from heyitsanthony/test-e2e-grpcproxy

e2e grpcproxy tests
Anthony Romano 8 years ago
parent
commit
2eb9353019

+ 21 - 0
e2e/cluster_direct_test.go

@@ -0,0 +1,21 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !cluster_proxy
+
+package e2e
+
+func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
+	return newEtcdServerProcess(cfg)
+}

+ 278 - 0
e2e/cluster_proxy_test.go

@@ -0,0 +1,278 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build cluster_proxy
+
+package e2e
+
+import (
+	"fmt"
+	"net"
+	"net/url"
+	"os"
+	"strconv"
+	"strings"
+
+	"github.com/coreos/etcd/pkg/expect"
+)
+
+type proxyEtcdProcess struct {
+	etcdProc etcdProcess
+	proxyV2  *proxyV2Proc
+	proxyV3  *proxyV3Proc
+}
+
+func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
+	return newProxyEtcdProcess(cfg)
+}
+
+func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) {
+	ep, err := newEtcdServerProcess(cfg)
+	if err != nil {
+		return nil, err
+	}
+	pep := &proxyEtcdProcess{
+		etcdProc: ep,
+		proxyV2:  newProxyV2Proc(cfg),
+		proxyV3:  newProxyV3Proc(cfg),
+	}
+	return pep, nil
+}
+
+func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
+
+func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
+func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
+
+func (p *proxyEtcdProcess) Start() error {
+	if err := p.etcdProc.Start(); err != nil {
+		return err
+	}
+	if err := p.proxyV2.Start(); err != nil {
+		return err
+	}
+	return p.proxyV3.Start()
+}
+
+func (p *proxyEtcdProcess) Restart() error {
+	if err := p.etcdProc.Restart(); err != nil {
+		return err
+	}
+	if err := p.proxyV2.Restart(); err != nil {
+		return err
+	}
+	return p.proxyV3.Restart()
+}
+
+func (p *proxyEtcdProcess) Stop() error {
+	err := p.proxyV2.Stop()
+	if v3err := p.proxyV3.Stop(); err == nil {
+		err = v3err
+	}
+	if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
+		// fails on go-grpc issue #1384
+		if !strings.Contains(eerr.Error(), "exit status 2") {
+			err = eerr
+		}
+	}
+	return err
+}
+
+func (p *proxyEtcdProcess) Close() error {
+	err := p.proxyV2.Close()
+	if v3err := p.proxyV3.Close(); err == nil {
+		err = v3err
+	}
+	if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
+		// fails on go-grpc issue #1384
+		if !strings.Contains(eerr.Error(), "exit status 2") {
+			err = eerr
+		}
+	}
+	return err
+}
+
+func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
+	p.proxyV3.WithStopSignal(sig)
+	p.proxyV3.WithStopSignal(sig)
+	return p.etcdProc.WithStopSignal(sig)
+}
+
+type proxyProc struct {
+	execPath string
+	args     []string
+	ep       string
+	donec    chan struct{}
+
+	proc *expect.ExpectProcess
+}
+
+func (pp *proxyProc) endpoints() []string { return []string{pp.ep} }
+
+func (pp *proxyProc) start() error {
+	if pp.proc != nil {
+		panic("already started")
+	}
+	proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...))
+	if err != nil {
+		return err
+	}
+	pp.proc = proc
+	return nil
+}
+
+func (pp *proxyProc) waitReady(readyStr string) error {
+	defer close(pp.donec)
+	return waitReadyExpectProc(pp.proc, []string{readyStr})
+}
+
+func (pp *proxyProc) Stop() error {
+	if pp.proc == nil {
+		return nil
+	}
+	if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") {
+		// v2proxy exits with status 1 on auto tls; not sure why
+		return err
+	}
+	pp.proc = nil
+	<-pp.donec
+	pp.donec = make(chan struct{})
+	return nil
+}
+
+func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal {
+	ret := pp.proc.StopSignal
+	pp.proc.StopSignal = sig
+	return ret
+}
+
+func (pp *proxyProc) Close() error { return pp.Stop() }
+
+type proxyV2Proc struct {
+	proxyProc
+	dataDir string
+}
+
+func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string {
+	u, err := url.Parse(cfg.acurl)
+	if err != nil {
+		panic(err)
+	}
+	host, port, _ := net.SplitHostPort(u.Host)
+	p, _ := strconv.ParseInt(port, 10, 16)
+	u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset)
+	return u.String()
+}
+
+func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc {
+	listenAddr := proxyListenURL(cfg, 2)
+	name := fmt.Sprintf("testname-proxy-%p", cfg)
+	args := []string{
+		"--name", name,
+		"--proxy", "on",
+		"--listen-client-urls", listenAddr,
+		"--initial-cluster", cfg.name + "=" + cfg.purl.String(),
+	}
+	return &proxyV2Proc{
+		proxyProc{
+			execPath: cfg.execPath,
+			args:     append(args, cfg.tlsArgs...),
+			ep:       listenAddr,
+			donec:    make(chan struct{}),
+		},
+		name + ".etcd",
+	}
+}
+
+func (v2p *proxyV2Proc) Start() error {
+	os.RemoveAll(v2p.dataDir)
+	if err := v2p.start(); err != nil {
+		return err
+	}
+	return v2p.waitReady("httpproxy: endpoints found")
+}
+
+func (v2p *proxyV2Proc) Restart() error {
+	if err := v2p.Stop(); err != nil {
+		return err
+	}
+	return v2p.Start()
+}
+
+func (v2p *proxyV2Proc) Stop() error {
+	if err := v2p.proxyProc.Stop(); err != nil {
+		return err
+	}
+	// v2 proxy caches members; avoid reuse of directory
+	return os.RemoveAll(v2p.dataDir)
+}
+
+type proxyV3Proc struct {
+	proxyProc
+}
+
+func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
+	listenAddr := proxyListenURL(cfg, 3)
+	args := []string{
+		"grpc-proxy",
+		"start",
+		"--listen-addr", strings.Split(listenAddr, "/")[2],
+		"--endpoints", cfg.acurl,
+		// pass-through member RPCs
+		"--advertise-client-url", "",
+	}
+	tlsArgs := []string{}
+	for i := 0; i < len(cfg.tlsArgs); i++ {
+		switch cfg.tlsArgs[i] {
+		case "--cert-file":
+			tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1])
+			i++
+		case "--key-file":
+			tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1])
+			i++
+		case "--ca-file":
+			tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1])
+			i++
+		case "--auto-tls":
+			tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify")
+		case "--peer-ca-file", "--peer-cert-file", "--peer-key-file":
+			i++ // skip arg
+		case "--client-cert-auth", "--peer-auto-tls":
+		default:
+			tlsArgs = append(tlsArgs, cfg.tlsArgs[i])
+		}
+	}
+	return &proxyV3Proc{
+		proxyProc{
+			execPath: cfg.execPath,
+			args:     append(args, tlsArgs...),
+			ep:       listenAddr,
+			donec:    make(chan struct{}),
+		},
+	}
+}
+
+func (v3p *proxyV3Proc) Restart() error {
+	if err := v3p.Stop(); err != nil {
+		return err
+	}
+	return v3p.Start()
+}
+
+func (v3p *proxyV3Proc) Start() error {
+	if err := v3p.start(); err != nil {
+		return err
+	}
+	return v3p.waitReady("listening for grpc-proxy client requests")
+}

+ 359 - 0
e2e/cluster_test.go

@@ -0,0 +1,359 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/url"
+	"os"
+	"strings"
+
+	"github.com/coreos/etcd/etcdserver"
+)
+
+const etcdProcessBasePort = 20000
+
+type clientConnType int
+
+const (
+	clientNonTLS clientConnType = iota
+	clientTLS
+	clientTLSAndNonTLS
+)
+
+var (
+	configNoTLS = etcdProcessClusterConfig{
+		clusterSize:  3,
+		initialToken: "new",
+	}
+	configAutoTLS = etcdProcessClusterConfig{
+		clusterSize:   3,
+		isPeerTLS:     true,
+		isPeerAutoTLS: true,
+		initialToken:  "new",
+	}
+	configTLS = etcdProcessClusterConfig{
+		clusterSize:  3,
+		clientTLS:    clientTLS,
+		isPeerTLS:    true,
+		initialToken: "new",
+	}
+	configClientTLS = etcdProcessClusterConfig{
+		clusterSize:  3,
+		clientTLS:    clientTLS,
+		initialToken: "new",
+	}
+	configClientBoth = etcdProcessClusterConfig{
+		clusterSize:  1,
+		clientTLS:    clientTLSAndNonTLS,
+		initialToken: "new",
+	}
+	configClientAutoTLS = etcdProcessClusterConfig{
+		clusterSize:     1,
+		isClientAutoTLS: true,
+		clientTLS:       clientTLS,
+		initialToken:    "new",
+	}
+	configPeerTLS = etcdProcessClusterConfig{
+		clusterSize:  3,
+		isPeerTLS:    true,
+		initialToken: "new",
+	}
+	configClientTLSCertAuth = etcdProcessClusterConfig{
+		clusterSize:           1,
+		clientTLS:             clientTLS,
+		initialToken:          "new",
+		clientCertAuthEnabled: true,
+	}
+)
+
+func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
+	ret := cfg
+	ret.clusterSize = 1
+	return &ret
+}
+
+type etcdProcessCluster struct {
+	cfg   *etcdProcessClusterConfig
+	procs []etcdProcess
+}
+
+type etcdProcessClusterConfig struct {
+	execPath    string
+	dataDirPath string
+	keepDataDir bool
+
+	clusterSize int
+
+	baseScheme string
+	basePort   int
+
+	snapCount int // default is 10000
+
+	clientTLS             clientConnType
+	clientCertAuthEnabled bool
+	isPeerTLS             bool
+	isPeerAutoTLS         bool
+	isClientAutoTLS       bool
+	isClientCRL           bool
+
+	forceNewCluster   bool
+	initialToken      string
+	quotaBackendBytes int64
+	noStrictReconfig  bool
+}
+
+// newEtcdProcessCluster launches a new cluster from etcd processes, returning
+// a new etcdProcessCluster once all nodes are ready to accept client requests.
+func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
+	etcdCfgs := cfg.etcdServerProcessConfigs()
+	epc := &etcdProcessCluster{
+		cfg:   cfg,
+		procs: make([]etcdProcess, cfg.clusterSize),
+	}
+
+	// launch etcd processes
+	for i := range etcdCfgs {
+		proc, err := newEtcdProcess(etcdCfgs[i])
+		if err != nil {
+			epc.Close()
+			return nil, err
+		}
+		epc.procs[i] = proc
+	}
+
+	if err := epc.Start(); err != nil {
+		return nil, err
+	}
+	return epc, nil
+}
+
+func (cfg *etcdProcessClusterConfig) clientScheme() string {
+	if cfg.clientTLS == clientTLS {
+		return "https"
+	}
+	return "http"
+}
+
+func (cfg *etcdProcessClusterConfig) peerScheme() string {
+	peerScheme := cfg.baseScheme
+	if peerScheme == "" {
+		peerScheme = "http"
+	}
+	if cfg.isPeerTLS {
+		peerScheme += "s"
+	}
+	return peerScheme
+}
+
+func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
+	if cfg.basePort == 0 {
+		cfg.basePort = etcdProcessBasePort
+	}
+	if cfg.execPath == "" {
+		cfg.execPath = binPath
+	}
+	if cfg.snapCount == 0 {
+		cfg.snapCount = etcdserver.DefaultSnapCount
+	}
+
+	etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
+	initialCluster := make([]string, cfg.clusterSize)
+	for i := 0; i < cfg.clusterSize; i++ {
+		var curls []string
+		var curl, curltls string
+		port := cfg.basePort + 4*i
+		curlHost := fmt.Sprintf("localhost:%d", port)
+
+		switch cfg.clientTLS {
+		case clientNonTLS, clientTLS:
+			curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
+			curls = []string{curl}
+		case clientTLSAndNonTLS:
+			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
+			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
+			curls = []string{curl, curltls}
+		}
+
+		purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
+		name := fmt.Sprintf("testname%d", i)
+		dataDirPath := cfg.dataDirPath
+		if cfg.dataDirPath == "" {
+			var derr error
+			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
+			if derr != nil {
+				panic("could not get tempdir for datadir")
+			}
+		}
+		initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
+
+		args := []string{
+			"--name", name,
+			"--listen-client-urls", strings.Join(curls, ","),
+			"--advertise-client-urls", strings.Join(curls, ","),
+			"--listen-peer-urls", purl.String(),
+			"--initial-advertise-peer-urls", purl.String(),
+			"--initial-cluster-token", cfg.initialToken,
+			"--data-dir", dataDirPath,
+			"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
+		}
+		if cfg.forceNewCluster {
+			args = append(args, "--force-new-cluster")
+		}
+		if cfg.quotaBackendBytes > 0 {
+			args = append(args,
+				"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
+			)
+		}
+		if cfg.noStrictReconfig {
+			args = append(args, "--strict-reconfig-check=false")
+		}
+
+		args = append(args, cfg.tlsArgs()...)
+		etcdCfgs[i] = &etcdServerProcessConfig{
+			execPath:     cfg.execPath,
+			args:         args,
+			tlsArgs:      cfg.tlsArgs(),
+			dataDirPath:  dataDirPath,
+			keepDataDir:  cfg.keepDataDir,
+			name:         name,
+			purl:         purl,
+			acurl:        curl,
+			initialToken: cfg.initialToken,
+		}
+	}
+
+	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
+	for i := range etcdCfgs {
+		etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
+		etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
+	}
+
+	return etcdCfgs
+}
+
+func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
+	if cfg.clientTLS != clientNonTLS {
+		if cfg.isClientAutoTLS {
+			args = append(args, "--auto-tls")
+		} else {
+			tlsClientArgs := []string{
+				"--cert-file", certPath,
+				"--key-file", privateKeyPath,
+				"--ca-file", caPath,
+			}
+			args = append(args, tlsClientArgs...)
+
+			if cfg.clientCertAuthEnabled {
+				args = append(args, "--client-cert-auth")
+			}
+		}
+	}
+
+	if cfg.isPeerTLS {
+		if cfg.isPeerAutoTLS {
+			args = append(args, "--peer-auto-tls")
+		} else {
+			tlsPeerArgs := []string{
+				"--peer-cert-file", certPath,
+				"--peer-key-file", privateKeyPath,
+				"--peer-ca-file", caPath,
+			}
+			args = append(args, tlsPeerArgs...)
+		}
+	}
+
+	if cfg.isClientCRL {
+		args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
+	}
+
+	return args
+}
+
+func (epc *etcdProcessCluster) EndpointsV2() []string {
+	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
+}
+
+func (epc *etcdProcessCluster) EndpointsV3() []string {
+	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
+}
+
+func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
+	for _, p := range epc.procs {
+		ret = append(ret, f(p)...)
+	}
+	return ret
+}
+
+func (epc *etcdProcessCluster) Start() error {
+	return epc.start(func(ep etcdProcess) error { return ep.Start() })
+}
+
+func (epc *etcdProcessCluster) Restart() error {
+	return epc.start(func(ep etcdProcess) error { return ep.Restart() })
+}
+
+func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
+	readyC := make(chan error, len(epc.procs))
+	for i := range epc.procs {
+		go func(n int) { readyC <- f(epc.procs[n]) }(i)
+	}
+	for range epc.procs {
+		if err := <-readyC; err != nil {
+			epc.Close()
+			return err
+		}
+	}
+	return nil
+}
+
+func (epc *etcdProcessCluster) Stop() (err error) {
+	for _, p := range epc.procs {
+		if p == nil {
+			continue
+		}
+		if curErr := p.Stop(); curErr != nil {
+			if err != nil {
+				err = fmt.Errorf("%v; %v", err, curErr)
+			} else {
+				err = curErr
+			}
+		}
+	}
+	return err
+}
+
+func (epc *etcdProcessCluster) Close() error {
+	err := epc.Stop()
+	for _, p := range epc.procs {
+		// p is nil when newEtcdProcess fails in the middle
+		// Close still gets called to clean up test data
+		if p == nil {
+			continue
+		}
+		if cerr := p.Close(); cerr != nil {
+			err = cerr
+		}
+	}
+	return err
+}
+
+func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
+	for _, p := range epc.procs {
+		ret = p.WithStopSignal(sig)
+	}
+	return ret
+}

+ 9 - 21
e2e/ctl_v2_test.go

@@ -128,10 +128,9 @@ func testCtlV2Ls(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) {
 	}
 }
 
-func TestCtlV2Watch(t *testing.T)                { testCtlV2Watch(t, &configNoTLS, false) }
-func TestCtlV2WatchTLS(t *testing.T)             { testCtlV2Watch(t, &configTLS, false) }
-func TestCtlV2WatchWithProxy(t *testing.T)       { testCtlV2Watch(t, &configWithProxy, false) }
-func TestCtlV2WatchWithProxyNoSync(t *testing.T) { testCtlV2Watch(t, &configWithProxy, true) }
+func TestCtlV2Watch(t *testing.T)    { testCtlV2Watch(t, &configNoTLS, false) }
+func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) }
+
 func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) {
 	defer testutil.AfterTest(t)
 
@@ -158,12 +157,10 @@ func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) {
 	}
 }
 
-func TestCtlV2GetRoleUser(t *testing.T)          { testCtlV2GetRoleUser(t, &configNoTLS) }
-func TestCtlV2GetRoleUserWithProxy(t *testing.T) { testCtlV2GetRoleUser(t, &configWithProxy) }
-func testCtlV2GetRoleUser(t *testing.T, cfg *etcdProcessClusterConfig) {
+func TestCtlV2GetRoleUser(t *testing.T) {
 	defer testutil.AfterTest(t)
 
-	epc := setupEtcdctlTest(t, cfg, false)
+	epc := setupEtcdctlTest(t, &configNoTLS, false)
 	defer func() {
 		if err := epc.Close(); err != nil {
 			t.Fatalf("error closing etcd processes (%v)", err)
@@ -196,7 +193,7 @@ func TestCtlV2UserListRoot(t *testing.T)     { testCtlV2UserList(t, "root") }
 func testCtlV2UserList(t *testing.T, username string) {
 	defer testutil.AfterTest(t)
 
-	epc := setupEtcdctlTest(t, &configWithProxy, false)
+	epc := setupEtcdctlTest(t, &configNoTLS, false)
 	defer func() {
 		if err := epc.Close(); err != nil {
 			t.Fatalf("error closing etcd processes (%v)", err)
@@ -214,7 +211,7 @@ func testCtlV2UserList(t *testing.T, username string) {
 func TestCtlV2RoleList(t *testing.T) {
 	defer testutil.AfterTest(t)
 
-	epc := setupEtcdctlTest(t, &configWithProxy, false)
+	epc := setupEtcdctlTest(t, &configNoTLS, false)
 	defer func() {
 		if err := epc.Close(); err != nil {
 			t.Fatalf("error closing etcd processes (%v)", err)
@@ -243,7 +240,7 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
 		t.Fatal(err)
 	}
 
-	if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil {
+	if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir); err != nil {
 		t.Fatal(err)
 	}
 
@@ -350,16 +347,7 @@ func TestCtlV2ClusterHealth(t *testing.T) {
 }
 
 func etcdctlPrefixArgs(clus *etcdProcessCluster) []string {
-	endpoints := ""
-	if proxies := clus.proxies(); len(proxies) != 0 {
-		endpoints = proxies[0].cfg.acurl
-	} else if processes := clus.processes(); len(processes) != 0 {
-		es := []string{}
-		for _, b := range processes {
-			es = append(es, b.cfg.acurl)
-		}
-		endpoints = strings.Join(es, ",")
-	}
+	endpoints := strings.Join(clus.EndpointsV2(), ",")
 	cmdArgs := []string{ctlBinPath, "--endpoints", endpoints}
 	if clus.cfg.clientTLS == clientTLS {
 		cmdArgs = append(cmdArgs, "--ca-file", caPath, "--cert-file", certPath, "--key-file", privateKeyPath)

+ 1 - 1
e2e/ctl_v3_alarm_test.go

@@ -64,7 +64,7 @@ func alarmTest(cx ctlCtx) {
 		}
 	}
 
-	eps := cx.epc.grpcEndpoints()
+	eps := cx.epc.EndpointsV3()
 
 	// get latest revision to compact
 	cli, err := clientv3.New(clientv3.Config{

+ 99 - 0
e2e/ctl_v3_auth_test.go

@@ -12,10 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Skip proxy tests for now since auth is broken on grpcproxy.
+// +build !cluster_proxy
+
 package e2e
 
 import (
 	"fmt"
+	"os"
 	"testing"
 
 	"github.com/coreos/etcd/clientv3"
@@ -44,6 +48,12 @@ func TestCtlV3AuthRoleGet(t *testing.T)  { testCtl(t, authTestRoleGet) }
 func TestCtlV3AuthUserGet(t *testing.T)  { testCtl(t, authTestUserGet) }
 func TestCtlV3AuthRoleList(t *testing.T) { testCtl(t, authTestRoleList) }
 
+func TestCtlV3AuthDefrag(t *testing.T) { testCtl(t, authTestDefrag) }
+func TestCtlV3AuthEndpointHealth(t *testing.T) {
+	testCtl(t, authTestEndpointHealth, withQuorum())
+}
+func TestCtlV3AuthSnapshot(t *testing.T) { testCtl(t, authTestSnapshot) }
+
 func authEnableTest(cx ctlCtx) {
 	if err := authEnable(cx); err != nil {
 		cx.t.Fatal(err)
@@ -816,3 +826,92 @@ func authTestRoleList(cx ctlCtx) {
 		cx.t.Fatal(err)
 	}
 }
+
+func authTestDefrag(cx ctlCtx) {
+	maintenanceInitKeys(cx)
+
+	if err := authEnable(cx); err != nil {
+		cx.t.Fatal(err)
+	}
+
+	cx.user, cx.pass = "root", "root"
+	authSetupTestUser(cx)
+
+	// ordinary user cannot defrag
+	cx.user, cx.pass = "test-user", "pass"
+	if err := ctlV3Defrag(cx); err == nil {
+		cx.t.Fatal("ordinary user should not be able to issue a defrag request")
+	}
+
+	// root can defrag
+	cx.user, cx.pass = "root", "root"
+	if err := ctlV3Defrag(cx); err != nil {
+		cx.t.Fatal(err)
+	}
+}
+
+func authTestSnapshot(cx ctlCtx) {
+	maintenanceInitKeys(cx)
+
+	if err := authEnable(cx); err != nil {
+		cx.t.Fatal(err)
+	}
+
+	cx.user, cx.pass = "root", "root"
+	authSetupTestUser(cx)
+
+	fpath := "test.snapshot"
+	defer os.RemoveAll(fpath)
+
+	// ordinary user cannot save a snapshot
+	cx.user, cx.pass = "test-user", "pass"
+	if err := ctlV3SnapshotSave(cx, fpath); err == nil {
+		cx.t.Fatal("ordinary user should not be able to save a snapshot")
+	}
+
+	// root can save a snapshot
+	cx.user, cx.pass = "root", "root"
+	if err := ctlV3SnapshotSave(cx, fpath); err != nil {
+		cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
+	}
+
+	st, err := getSnapshotStatus(cx, fpath)
+	if err != nil {
+		cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
+	}
+	if st.Revision != 4 {
+		cx.t.Fatalf("expected 4, got %d", st.Revision)
+	}
+	if st.TotalKey < 3 {
+		cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
+	}
+}
+
+func authTestEndpointHealth(cx ctlCtx) {
+	if err := authEnable(cx); err != nil {
+		cx.t.Fatal(err)
+	}
+
+	cx.user, cx.pass = "root", "root"
+	authSetupTestUser(cx)
+
+	if err := ctlV3EndpointHealth(cx); err != nil {
+		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
+	}
+
+	// health checking with an ordinary user "succeeds" since permission denial goes through consensus
+	cx.user, cx.pass = "test-user", "pass"
+	if err := ctlV3EndpointHealth(cx); err != nil {
+		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
+	}
+
+	// succeed if permissions granted for ordinary user
+	cx.user, cx.pass = "root", "root"
+	if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil {
+		cx.t.Fatal(err)
+	}
+	cx.user, cx.pass = "test-user", "pass"
+	if err := ctlV3EndpointHealth(cx); err != nil {
+		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
+	}
+}

+ 1 - 25
e2e/ctl_v3_defrag_test.go

@@ -16,8 +16,7 @@ package e2e
 
 import "testing"
 
-func TestCtlV3Defrag(t *testing.T)         { testCtl(t, defragTest) }
-func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) }
+func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
 
 func maintenanceInitKeys(cx ctlCtx) {
 	var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
@@ -40,29 +39,6 @@ func defragTest(cx ctlCtx) {
 	}
 }
 
-func defragTestWithAuth(cx ctlCtx) {
-	maintenanceInitKeys(cx)
-
-	if err := authEnable(cx); err != nil {
-		cx.t.Fatal(err)
-	}
-
-	cx.user, cx.pass = "root", "root"
-	authSetupTestUser(cx)
-
-	// ordinary user cannot defrag
-	cx.user, cx.pass = "test-user", "pass"
-	if err := ctlV3Defrag(cx); err == nil {
-		cx.t.Fatal("ordinary user should not be able to issue a defrag request")
-	}
-
-	// root can defrag
-	cx.user, cx.pass = "root", "root"
-	if err := ctlV3Defrag(cx); err != nil {
-		cx.t.Fatal(err)
-	}
-}
-
 func ctlV3Defrag(cx ctlCtx) error {
 	cmdArgs := append(cx.PrefixArgs(), "defrag")
 	lines := make([]string, cx.epc.cfg.clusterSize)

+ 2 - 2
e2e/ctl_v3_elect_test.go

@@ -33,8 +33,8 @@ func TestCtlV3Elect(t *testing.T) {
 
 func testElect(cx ctlCtx) {
 	// debugging for #6934
-	sig := cx.epc.withStopSignal(debugLockSignal)
-	defer cx.epc.withStopSignal(sig)
+	sig := cx.epc.WithStopSignal(debugLockSignal)
+	defer cx.epc.WithStopSignal(sig)
 
 	name := "a"
 

+ 1 - 33
e2e/ctl_v3_endpoint_test.go

@@ -21,9 +21,6 @@ import (
 
 func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) }
 func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) }
-func TestCtlV3EndpointHealthWithAuth(t *testing.T) {
-	testCtl(t, endpointHealthTestWithAuth, withQuorum())
-}
 
 func endpointHealthTest(cx ctlCtx) {
 	if err := ctlV3EndpointHealth(cx); err != nil {
@@ -49,38 +46,9 @@ func endpointStatusTest(cx ctlCtx) {
 func ctlV3EndpointStatus(cx ctlCtx) error {
 	cmdArgs := append(cx.PrefixArgs(), "endpoint", "status")
 	var eps []string
-	for _, ep := range cx.epc.endpoints() {
+	for _, ep := range cx.epc.EndpointsV3() {
 		u, _ := url.Parse(ep)
 		eps = append(eps, u.Host)
 	}
 	return spawnWithExpects(cmdArgs, eps...)
 }
-
-func endpointHealthTestWithAuth(cx ctlCtx) {
-	if err := authEnable(cx); err != nil {
-		cx.t.Fatal(err)
-	}
-
-	cx.user, cx.pass = "root", "root"
-	authSetupTestUser(cx)
-
-	if err := ctlV3EndpointHealth(cx); err != nil {
-		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
-	}
-
-	// health checking with an ordinary user "succeeds" since permission denial goes through consensus
-	cx.user, cx.pass = "test-user", "pass"
-	if err := ctlV3EndpointHealth(cx); err != nil {
-		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
-	}
-
-	// succeed if permissions granted for ordinary user
-	cx.user, cx.pass = "root", "root"
-	if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil {
-		cx.t.Fatal(err)
-	}
-	cx.user, cx.pass = "test-user", "pass"
-	if err := ctlV3EndpointHealth(cx); err != nil {
-		cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
-	}
-}

+ 2 - 2
e2e/ctl_v3_lock_test.go

@@ -56,8 +56,8 @@ func TestCtlV3Lock(t *testing.T) {
 
 func testLock(cx ctlCtx) {
 	// debugging for #6464
-	sig := cx.epc.withStopSignal(debugLockSignal)
-	defer cx.epc.withStopSignal(sig)
+	sig := cx.epc.WithStopSignal(debugLockSignal)
+	defer cx.epc.WithStopSignal(sig)
 
 	name := "a"
 

+ 5 - 5
e2e/ctl_v3_migrate_test.go

@@ -48,8 +48,8 @@ func TestCtlV3Migrate(t *testing.T) {
 		}
 	}
 
-	dataDir := epc.procs[0].cfg.dataDirPath
-	if err := epc.StopAll(); err != nil {
+	dataDir := epc.procs[0].Config().dataDirPath
+	if err := epc.Stop(); err != nil {
 		t.Fatalf("error closing etcd processes (%v)", err)
 	}
 
@@ -65,8 +65,8 @@ func TestCtlV3Migrate(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	epc.procs[0].cfg.keepDataDir = true
-	if err := epc.RestartAll(); err != nil {
+	epc.procs[0].Config().keepDataDir = true
+	if err := epc.Restart(); err != nil {
 		t.Fatal(err)
 	}
 
@@ -75,7 +75,7 @@ func TestCtlV3Migrate(t *testing.T) {
 		t.Fatal(err)
 	}
 	cli, err := clientv3.New(clientv3.Config{
-		Endpoints:   epc.grpcEndpoints(),
+		Endpoints:   epc.EndpointsV3(),
 		DialTimeout: 3 * time.Second,
 	})
 	if err != nil {

+ 3 - 3
e2e/ctl_v3_move_leader_test.go

@@ -39,7 +39,7 @@ func TestCtlV3MoveLeader(t *testing.T) {
 	var leadIdx int
 	var leaderID uint64
 	var transferee uint64
-	for i, ep := range epc.grpcEndpoints() {
+	for i, ep := range epc.EndpointsV3() {
 		cli, err := clientv3.New(clientv3.Config{
 			Endpoints:   []string{ep},
 			DialTimeout: 3 * time.Second,
@@ -75,11 +75,11 @@ func TestCtlV3MoveLeader(t *testing.T) {
 		expect   string
 	}{
 		{ // request to non-leader
-			cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}),
+			cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}),
 			"no leader endpoint given at ",
 		},
 		{ // request to leader
-			cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}),
+			cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}),
 			fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
 		},
 	}

+ 10 - 49
e2e/ctl_v3_snapshot_test.go

@@ -152,7 +152,7 @@ func TestIssue6361(t *testing.T) {
 	}()
 
 	dialTimeout := 7 * time.Second
-	prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()}
+	prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}
 
 	// write some keys
 	kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
@@ -170,7 +170,7 @@ func TestIssue6361(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	if err = epc.processes()[0].Stop(); err != nil {
+	if err = epc.procs[0].Stop(); err != nil {
 		t.Fatal(err)
 	}
 
@@ -178,19 +178,19 @@ func TestIssue6361(t *testing.T) {
 	defer os.RemoveAll(newDataDir)
 
 	// etcdctl restore the snapshot
-	err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member")
+	err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "membership: added member")
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	// start the etcd member using the restored snapshot
-	epc.procs[0].cfg.dataDirPath = newDataDir
-	for i := range epc.procs[0].cfg.args {
-		if epc.procs[0].cfg.args[i] == "--data-dir" {
-			epc.procs[0].cfg.args[i+1] = newDataDir
+	epc.procs[0].Config().dataDirPath = newDataDir
+	for i := range epc.procs[0].Config().args {
+		if epc.procs[0].Config().args[i] == "--data-dir" {
+			epc.procs[0].Config().args[i+1] = newDataDir
 		}
 	}
-	if err = epc.processes()[0].Restart(); err != nil {
+	if err = epc.procs[0].Restart(); err != nil {
 		t.Fatal(err)
 	}
 
@@ -217,11 +217,11 @@ func TestIssue6361(t *testing.T) {
 	defer os.RemoveAll(newDataDir2)
 
 	name2 := "infra2"
-	initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
+	initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
 
 	// start the new member
 	var nepc *expect.ExpectProcess
-	nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2,
+	nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
 		"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
 		"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
 		"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2})
@@ -245,42 +245,3 @@ func TestIssue6361(t *testing.T) {
 		t.Fatal(err)
 	}
 }
-
-func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) }
-
-func snapshotTestWithAuth(cx ctlCtx) {
-	maintenanceInitKeys(cx)
-
-	if err := authEnable(cx); err != nil {
-		cx.t.Fatal(err)
-	}
-
-	cx.user, cx.pass = "root", "root"
-	authSetupTestUser(cx)
-
-	fpath := "test.snapshot"
-	defer os.RemoveAll(fpath)
-
-	// ordinary user cannot save a snapshot
-	cx.user, cx.pass = "test-user", "pass"
-	if err := ctlV3SnapshotSave(cx, fpath); err == nil {
-		cx.t.Fatal("ordinary user should not be able to save a snapshot")
-	}
-
-	// root can save a snapshot
-	cx.user, cx.pass = "root", "root"
-	if err := ctlV3SnapshotSave(cx, fpath); err != nil {
-		cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
-	}
-
-	st, err := getSnapshotStatus(cx, fpath)
-	if err != nil {
-		cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
-	}
-	if st.Revision != 4 {
-		cx.t.Fatalf("expected 4, got %d", st.Revision)
-	}
-	if st.TotalKey < 3 {
-		cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
-	}
-}

+ 2 - 6
e2e/ctl_v3_test.go

@@ -45,7 +45,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) {
 }
 
 func dialWithSchemeTest(cx ctlCtx) {
-	cmdArgs := append(cx.prefixArgs(cx.epc.endpoints()), "put", "foo", "bar")
+	cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar")
 	if err := spawnWithExpect(cmdArgs, "OK"); err != nil {
 		cx.t.Fatal(err)
 	}
@@ -169,10 +169,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
 }
 
 func (cx *ctlCtx) prefixArgs(eps []string) []string {
-	if len(cx.epc.proxies()) > 0 { // TODO: add proxy check as in v2
-		panic("v3 proxy not implemented")
-	}
-
 	fmap := make(map[string]string)
 	fmap["endpoints"] = strings.Join(eps, ",")
 	fmap["dial-timeout"] = cx.dialTimeout.String()
@@ -212,7 +208,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
 // PrefixArgs prefixes etcdctl command.
 // Make sure to unset environment variables after tests.
 func (cx *ctlCtx) PrefixArgs() []string {
-	return cx.prefixArgs(cx.epc.grpcEndpoints())
+	return cx.prefixArgs(cx.epc.EndpointsV3())
 }
 
 func isGRPCTimedout(err error) bool {

+ 1 - 1
e2e/etcd_config_test.go

@@ -25,7 +25,7 @@ func TestEtcdExampleConfig(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	if err = waitReadyExpectProc(proc, false); err != nil {
+	if err = waitReadyExpectProc(proc, etcdServerReadyLines); err != nil {
 		t.Fatal(err)
 	}
 	if err = proc.Stop(); err != nil {

+ 134 - 0
e2e/etcd_process.go

@@ -0,0 +1,134 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"fmt"
+	"net/url"
+	"os"
+
+	"github.com/coreos/etcd/pkg/expect"
+	"github.com/coreos/etcd/pkg/fileutil"
+)
+
+var etcdServerReadyLines = []string{"enabled capabilities for version", "published"}
+
+// etcdProcess is a process that serves etcd requests.
+type etcdProcess interface {
+	EndpointsV2() []string
+	EndpointsV3() []string
+
+	Start() error
+	Restart() error
+	Stop() error
+	Close() error
+	WithStopSignal(sig os.Signal) os.Signal
+	Config() *etcdServerProcessConfig
+}
+
+type etcdServerProcess struct {
+	cfg   *etcdServerProcessConfig
+	proc  *expect.ExpectProcess
+	donec chan struct{} // closed when Interact() terminates
+}
+
+type etcdServerProcessConfig struct {
+	execPath string
+	args     []string
+	tlsArgs  []string
+
+	dataDirPath string
+	keepDataDir bool
+
+	name string
+
+	purl url.URL
+
+	acurl string
+
+	initialToken   string
+	initialCluster string
+}
+
+func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) {
+	if !fileutil.Exist(cfg.execPath) {
+		return nil, fmt.Errorf("could not find etcd binary")
+	}
+	if !cfg.keepDataDir {
+		if err := os.RemoveAll(cfg.dataDirPath); err != nil {
+			return nil, err
+		}
+	}
+	return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
+}
+
+func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
+func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
+
+func (ep *etcdServerProcess) Start() error {
+	if ep.proc != nil {
+		panic("already started")
+	}
+	proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...))
+	if err != nil {
+		return err
+	}
+	ep.proc = proc
+	return ep.waitReady()
+}
+
+func (ep *etcdServerProcess) Restart() error {
+	if err := ep.Stop(); err != nil {
+		return err
+	}
+	ep.donec = make(chan struct{})
+	return ep.Start()
+}
+
+func (ep *etcdServerProcess) Stop() error {
+	if ep == nil || ep.proc == nil {
+		return nil
+	}
+	if err := ep.proc.Stop(); err != nil {
+		return err
+	}
+	ep.proc = nil
+	<-ep.donec
+	ep.donec = make(chan struct{})
+	if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
+		os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
+	}
+	return nil
+}
+
+func (ep *etcdServerProcess) Close() error {
+	if err := ep.Stop(); err != nil {
+		return err
+	}
+	return os.RemoveAll(ep.cfg.dataDirPath)
+}
+
+func (ep *etcdServerProcess) WithStopSignal(sig os.Signal) os.Signal {
+	ret := ep.proc.StopSignal
+	ep.proc.StopSignal = sig
+	return ret
+}
+
+func (ep *etcdServerProcess) waitReady() error {
+	defer close(ep.donec)
+	return waitReadyExpectProc(ep.proc, etcdServerReadyLines)
+}
+
+func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }

+ 4 - 4
e2e/etcd_release_upgrade_test.go

@@ -88,8 +88,8 @@ func TestReleaseUpgrade(t *testing.T) {
 		if err := epc.procs[i].Stop(); err != nil {
 			t.Fatalf("#%d: error closing etcd process (%v)", i, err)
 		}
-		epc.procs[i].cfg.execPath = binDir + "/etcd"
-		epc.procs[i].cfg.keepDataDir = true
+		epc.procs[i].Config().execPath = binDir + "/etcd"
+		epc.procs[i].Config().keepDataDir = true
 
 		if err := epc.procs[i].Restart(); err != nil {
 			t.Fatalf("error restarting etcd process (%v)", err)
@@ -155,8 +155,8 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
 	wg.Add(len(epc.procs))
 	for i := range epc.procs {
 		go func(i int) {
-			epc.procs[i].cfg.execPath = binDir + "/etcd"
-			epc.procs[i].cfg.keepDataDir = true
+			epc.procs[i].Config().execPath = binDir + "/etcd"
+			epc.procs[i].Config().keepDataDir = true
 			if err := epc.procs[i].Restart(); err != nil {
 				t.Fatalf("error restarting etcd process (%v)", err)
 			}

+ 28 - 15
e2e/etcd_spawn_cov.go

@@ -33,20 +33,7 @@ const noOutputLineCount = 2 // cov-enabled binaries emit PASS and coverage count
 
 func spawnCmd(args []string) (*expect.ExpectProcess, error) {
 	if args[0] == binPath {
-		covArgs, err := getCovArgs()
-		if err != nil {
-			return nil, err
-		}
-		ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, args2env(args[1:]))
-		if err != nil {
-			return nil, err
-		}
-		// ep sends SIGTERM to etcd_test process on ep.close()
-		// allowing the process to exit gracefully in order to generate a coverage report.
-		// note: go runtime ignores SIGINT but not SIGTERM
-		// if e2e test is run as a background process.
-		ep.StopSignal = syscall.SIGTERM
-		return ep, nil
+		return spawnEtcd(args)
 	}
 
 	if args[0] == ctlBinPath {
@@ -73,6 +60,32 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) {
 	return expect.NewExpect(args[0], args[1:]...)
 }
 
+func spawnEtcd(args []string) (*expect.ExpectProcess, error) {
+	covArgs, err := getCovArgs()
+	if err != nil {
+		return nil, err
+	}
+
+	env := []string{}
+	if args[1] == "grpc-proxy" {
+		// avoid test flag conflicts in coverage enabled etcd by putting flags in ETCDCOV_ARGS
+		env = append(os.Environ(), "ETCDCOV_ARGS="+strings.Join(args, "\xe7\xcd"))
+	} else {
+		env = args2env(args[1:])
+	}
+
+	ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, env)
+	if err != nil {
+		return nil, err
+	}
+	// ep sends SIGTERM to etcd_test process on ep.close()
+	// allowing the process to exit gracefully in order to generate a coverage report.
+	// note: go runtime ignores SIGINT but not SIGTERM
+	// if e2e test is run as a background process.
+	ep.StopSignal = syscall.SIGTERM
+	return ep, nil
+}
+
 func getCovArgs() ([]string, error) {
 	coverPath := os.Getenv("COVERDIR")
 	if !filepath.IsAbs(coverPath) {
@@ -92,7 +105,7 @@ func getCovArgs() ([]string, error) {
 
 func args2env(args []string) []string {
 	var covEnvs []string
-	for i := range args[1:] {
+	for i := range args {
 		if !strings.HasPrefix(args[i], "--") {
 			continue
 		}

+ 0 - 593
e2e/etcd_test.go

@@ -1,593 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package e2e
-
-import (
-	"fmt"
-	"io/ioutil"
-	"net/url"
-	"os"
-	"strings"
-	"time"
-
-	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/pkg/expect"
-	"github.com/coreos/etcd/pkg/fileutil"
-)
-
-const etcdProcessBasePort = 20000
-
-var (
-	binPath        string
-	ctlBinPath     string
-	certPath       string
-	privateKeyPath string
-	caPath         string
-
-	crlPath               string
-	revokedCertPath       string
-	revokedPrivateKeyPath string
-)
-
-type clientConnType int
-
-const (
-	clientNonTLS clientConnType = iota
-	clientTLS
-	clientTLSAndNonTLS
-)
-
-var (
-	configNoTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    0,
-		initialToken: "new",
-	}
-	configAutoTLS = etcdProcessClusterConfig{
-		clusterSize:   3,
-		isPeerTLS:     true,
-		isPeerAutoTLS: true,
-		initialToken:  "new",
-	}
-	configTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    0,
-		clientTLS:    clientTLS,
-		isPeerTLS:    true,
-		initialToken: "new",
-	}
-	configClientTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    0,
-		clientTLS:    clientTLS,
-		initialToken: "new",
-	}
-	configClientBoth = etcdProcessClusterConfig{
-		clusterSize:  1,
-		proxySize:    0,
-		clientTLS:    clientTLSAndNonTLS,
-		initialToken: "new",
-	}
-	configClientAutoTLS = etcdProcessClusterConfig{
-		clusterSize:     1,
-		proxySize:       0,
-		isClientAutoTLS: true,
-		clientTLS:       clientTLS,
-		initialToken:    "new",
-	}
-	configPeerTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    0,
-		isPeerTLS:    true,
-		initialToken: "new",
-	}
-	configWithProxy = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    1,
-		initialToken: "new",
-	}
-	configWithProxyTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    1,
-		clientTLS:    clientTLS,
-		isPeerTLS:    true,
-		initialToken: "new",
-	}
-	configWithProxyPeerTLS = etcdProcessClusterConfig{
-		clusterSize:  3,
-		proxySize:    1,
-		isPeerTLS:    true,
-		initialToken: "new",
-	}
-	configClientTLSCertAuth = etcdProcessClusterConfig{
-		clusterSize:           1,
-		proxySize:             0,
-		clientTLS:             clientTLS,
-		initialToken:          "new",
-		clientCertAuthEnabled: true,
-	}
-)
-
-func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
-	ret := cfg
-	ret.clusterSize = 1
-	return &ret
-}
-
-type etcdProcessCluster struct {
-	cfg   *etcdProcessClusterConfig
-	procs []*etcdProcess
-}
-
-type etcdProcess struct {
-	cfg   *etcdProcessConfig
-	proc  *expect.ExpectProcess
-	donec chan struct{} // closed when Interact() terminates
-}
-
-type etcdProcessConfig struct {
-	execPath string
-	args     []string
-
-	dataDirPath string
-	keepDataDir bool
-
-	name string
-
-	purl url.URL
-
-	acurl string
-	// additional url for tls connection when the etcd process
-	// serves both http and https
-	acurltls  string
-	acurlHost string
-
-	initialToken   string
-	initialCluster string
-
-	isProxy bool
-}
-
-type etcdProcessClusterConfig struct {
-	execPath    string
-	dataDirPath string
-	keepDataDir bool
-
-	clusterSize int
-
-	baseScheme string
-	basePort   int
-
-	proxySize int
-
-	snapCount int // default is 10000
-
-	clientTLS             clientConnType
-	clientCertAuthEnabled bool
-	isPeerTLS             bool
-	isPeerAutoTLS         bool
-	isClientAutoTLS       bool
-	isClientCRL           bool
-
-	forceNewCluster   bool
-	initialToken      string
-	quotaBackendBytes int64
-	noStrictReconfig  bool
-}
-
-// newEtcdProcessCluster launches a new cluster from etcd processes, returning
-// a new etcdProcessCluster once all nodes are ready to accept client requests.
-func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
-	etcdCfgs := cfg.etcdProcessConfigs()
-	epc := &etcdProcessCluster{
-		cfg:   cfg,
-		procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
-	}
-
-	// launch etcd processes
-	for i := range etcdCfgs {
-		proc, err := newEtcdProcess(etcdCfgs[i])
-		if err != nil {
-			epc.Close()
-			return nil, err
-		}
-		epc.procs[i] = proc
-	}
-
-	return epc, epc.Start()
-}
-
-func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
-	if !fileutil.Exist(cfg.execPath) {
-		return nil, fmt.Errorf("could not find etcd binary")
-	}
-
-	if !cfg.keepDataDir {
-		if err := os.RemoveAll(cfg.dataDirPath); err != nil {
-			return nil, err
-		}
-	}
-
-	child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
-	if err != nil {
-		return nil, err
-	}
-	return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
-}
-
-func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
-	binPath = binDir + "/etcd"
-	ctlBinPath = binDir + "/etcdctl"
-	certPath = certDir + "/server.crt"
-	privateKeyPath = certDir + "/server.key.insecure"
-	caPath = certDir + "/ca.crt"
-
-	revokedCertPath = certDir + "/server-revoked.crt"
-	revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure"
-	crlPath = certDir + "/revoke.crl"
-
-	if cfg.basePort == 0 {
-		cfg.basePort = etcdProcessBasePort
-	}
-
-	if cfg.execPath == "" {
-		cfg.execPath = binPath
-	}
-	if cfg.snapCount == 0 {
-		cfg.snapCount = etcdserver.DefaultSnapCount
-	}
-
-	clientScheme := "http"
-	if cfg.clientTLS == clientTLS {
-		clientScheme = "https"
-	}
-	peerScheme := cfg.baseScheme
-	if peerScheme == "" {
-		peerScheme = "http"
-	}
-	if cfg.isPeerTLS {
-		peerScheme += "s"
-	}
-
-	etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
-	initialCluster := make([]string, cfg.clusterSize)
-	for i := 0; i < cfg.clusterSize; i++ {
-		var curls []string
-		var curl, curltls string
-		port := cfg.basePort + 2*i
-		curlHost := fmt.Sprintf("localhost:%d", port)
-
-		switch cfg.clientTLS {
-		case clientNonTLS, clientTLS:
-			curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
-			curls = []string{curl}
-		case clientTLSAndNonTLS:
-			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
-			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
-			curls = []string{curl, curltls}
-		}
-
-		purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
-		name := fmt.Sprintf("testname%d", i)
-		dataDirPath := cfg.dataDirPath
-		if cfg.dataDirPath == "" {
-			var derr error
-			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
-			if derr != nil {
-				panic("could not get tempdir for datadir")
-			}
-		}
-		initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
-
-		args := []string{
-			"--name", name,
-			"--listen-client-urls", strings.Join(curls, ","),
-			"--advertise-client-urls", strings.Join(curls, ","),
-			"--listen-peer-urls", purl.String(),
-			"--initial-advertise-peer-urls", purl.String(),
-			"--initial-cluster-token", cfg.initialToken,
-			"--data-dir", dataDirPath,
-			"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
-		}
-		if cfg.forceNewCluster {
-			args = append(args, "--force-new-cluster")
-		}
-		if cfg.quotaBackendBytes > 0 {
-			args = append(args,
-				"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
-			)
-		}
-		if cfg.noStrictReconfig {
-			args = append(args, "--strict-reconfig-check=false")
-		}
-
-		args = append(args, cfg.tlsArgs()...)
-		etcdCfgs[i] = &etcdProcessConfig{
-			execPath:     cfg.execPath,
-			args:         args,
-			dataDirPath:  dataDirPath,
-			keepDataDir:  cfg.keepDataDir,
-			name:         name,
-			purl:         purl,
-			acurl:        curl,
-			acurltls:     curltls,
-			acurlHost:    curlHost,
-			initialToken: cfg.initialToken,
-		}
-	}
-	for i := 0; i < cfg.proxySize; i++ {
-		port := cfg.basePort + 2*cfg.clusterSize + i + 1
-		curlHost := fmt.Sprintf("localhost:%d", port)
-		curl := url.URL{Scheme: clientScheme, Host: curlHost}
-		name := fmt.Sprintf("testname-proxy%d", i)
-		dataDirPath, derr := ioutil.TempDir("", name+".etcd")
-		if derr != nil {
-			panic("could not get tempdir for datadir")
-		}
-		args := []string{
-			"--name", name,
-			"--proxy", "on",
-			"--listen-client-urls", curl.String(),
-			"--data-dir", dataDirPath,
-		}
-		args = append(args, cfg.tlsArgs()...)
-		etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
-			execPath:    cfg.execPath,
-			args:        args,
-			dataDirPath: dataDirPath,
-			keepDataDir: cfg.keepDataDir,
-			name:        name,
-			acurl:       curl.String(),
-			acurlHost:   curlHost,
-			isProxy:     true,
-		}
-	}
-
-	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
-	for i := range etcdCfgs {
-		etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
-		etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
-	}
-
-	return etcdCfgs
-}
-
-func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
-	if cfg.clientTLS != clientNonTLS {
-		if cfg.isClientAutoTLS {
-			args = append(args, "--auto-tls=true")
-		} else {
-			tlsClientArgs := []string{
-				"--cert-file", certPath,
-				"--key-file", privateKeyPath,
-				"--ca-file", caPath,
-			}
-			args = append(args, tlsClientArgs...)
-
-			if cfg.clientCertAuthEnabled {
-				args = append(args, "--client-cert-auth")
-			}
-		}
-	}
-
-	if cfg.isPeerTLS {
-		if cfg.isPeerAutoTLS {
-			args = append(args, "--peer-auto-tls=true")
-		} else {
-			tlsPeerArgs := []string{
-				"--peer-cert-file", certPath,
-				"--peer-key-file", privateKeyPath,
-				"--peer-ca-file", caPath,
-			}
-			args = append(args, tlsPeerArgs...)
-		}
-	}
-
-	if cfg.isClientCRL {
-		args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
-	}
-
-	return args
-}
-
-func (epc *etcdProcessCluster) Start() (err error) {
-	readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
-	for i := range epc.procs {
-		go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
-	}
-	for range epc.procs {
-		if err := <-readyC; err != nil {
-			epc.Close()
-			return err
-		}
-	}
-	return nil
-}
-
-func (epc *etcdProcessCluster) RestartAll() error {
-	for i := range epc.procs {
-		proc, err := newEtcdProcess(epc.procs[i].cfg)
-		if err != nil {
-			epc.Close()
-			return err
-		}
-		epc.procs[i] = proc
-	}
-	return epc.Start()
-}
-
-func (epc *etcdProcessCluster) StopAll() (err error) {
-	for _, p := range epc.procs {
-		if p == nil {
-			continue
-		}
-		if curErr := p.Stop(); curErr != nil {
-			if err != nil {
-				err = fmt.Errorf("%v; %v", err, curErr)
-			} else {
-				err = curErr
-			}
-		}
-	}
-	return err
-}
-
-func (epc *etcdProcessCluster) Close() error {
-	err := epc.StopAll()
-	for _, p := range epc.procs {
-		// p is nil when newEtcdProcess fails in the middle
-		// Close still gets called to clean up test data
-		if p == nil {
-			continue
-		}
-		os.RemoveAll(p.cfg.dataDirPath)
-	}
-	return err
-}
-
-func (ep *etcdProcess) Restart() error {
-	newEp, err := newEtcdProcess(ep.cfg)
-	if err != nil {
-		ep.Stop()
-		return err
-	}
-	*ep = *newEp
-	if err = ep.waitReady(); err != nil {
-		ep.Stop()
-		return err
-	}
-	return nil
-}
-
-func (ep *etcdProcess) Stop() error {
-	if ep == nil {
-		return nil
-	}
-	if err := ep.proc.Stop(); err != nil {
-		return err
-	}
-	<-ep.donec
-
-	if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
-		os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
-	}
-	return nil
-}
-
-func (ep *etcdProcess) waitReady() error {
-	defer close(ep.donec)
-	return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
-}
-
-func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
-	readyStrs := []string{"enabled capabilities for version", "published"}
-	if isProxy {
-		readyStrs = []string{"httpproxy: endpoints found"}
-	}
-	c := 0
-	matchSet := func(l string) bool {
-		for _, s := range readyStrs {
-			if strings.Contains(l, s) {
-				c++
-				break
-			}
-		}
-		return c == len(readyStrs)
-	}
-	_, err := exproc.ExpectFunc(matchSet)
-	return err
-}
-
-func spawnWithExpect(args []string, expected string) error {
-	return spawnWithExpects(args, []string{expected}...)
-}
-
-func spawnWithExpects(args []string, xs ...string) error {
-	proc, err := spawnCmd(args)
-	if err != nil {
-		return err
-	}
-	// process until either stdout or stderr contains
-	// the expected string
-	var (
-		lines    []string
-		lineFunc = func(txt string) bool { return true }
-	)
-	for _, txt := range xs {
-		for {
-			l, lerr := proc.ExpectFunc(lineFunc)
-			if lerr != nil {
-				proc.Close()
-				return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
-			}
-			lines = append(lines, l)
-			if strings.Contains(l, txt) {
-				break
-			}
-		}
-	}
-	perr := proc.Close()
-	if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
-		return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
-	}
-	return perr
-}
-
-// proxies returns only the proxy etcdProcess.
-func (epc *etcdProcessCluster) proxies() []*etcdProcess {
-	return epc.procs[epc.cfg.clusterSize:]
-}
-
-func (epc *etcdProcessCluster) processes() []*etcdProcess {
-	return epc.procs[:epc.cfg.clusterSize]
-}
-
-func (epc *etcdProcessCluster) endpoints() []string {
-	eps := make([]string, epc.cfg.clusterSize)
-	for i, ep := range epc.processes() {
-		eps[i] = ep.cfg.acurl
-	}
-	return eps
-}
-
-func (epc *etcdProcessCluster) grpcEndpoints() []string {
-	eps := make([]string, epc.cfg.clusterSize)
-	for i, ep := range epc.processes() {
-		eps[i] = ep.cfg.acurlHost
-	}
-	return eps
-}
-
-func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal {
-	ret := epc.procs[0].proc.StopSignal
-	for _, p := range epc.procs {
-		p.proc.StopSignal = sig
-	}
-	return ret
-}
-
-func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
-	errc := make(chan error, 1)
-	go func() { errc <- p.Close() }()
-	select {
-	case err := <-errc:
-		return err
-	case <-time.After(d):
-		p.Stop()
-		// retry close after stopping to collect SIGQUIT data, if any
-		closeWithTimeout(p, time.Second)
-	}
-	return fmt.Errorf("took longer than %v to Close process %+v", d, p)
-}

+ 2 - 2
e2e/gateway_test.go

@@ -31,9 +31,9 @@ func TestGateway(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer ec.StopAll()
+	defer ec.Stop()
 
-	eps := strings.Join(ec.grpcEndpoints(), ",")
+	eps := strings.Join(ec.EndpointsV3(), ",")
 
 	p := startGateway(t, eps)
 	defer p.Stop()

+ 23 - 2
e2e/main_test.go

@@ -13,8 +13,20 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 )
 
-var binDir string
-var certDir string
+var (
+	binDir  string
+	certDir string
+
+	binPath        string
+	ctlBinPath     string
+	certPath       string
+	privateKeyPath string
+	caPath         string
+
+	crlPath               string
+	revokedCertPath       string
+	revokedPrivateKeyPath string
+)
 
 func TestMain(m *testing.M) {
 	os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH)
@@ -24,6 +36,15 @@ func TestMain(m *testing.M) {
 	flag.StringVar(&certDir, "cert-dir", "../integration/fixtures", "The directory for store certificate files.")
 	flag.Parse()
 
+	binPath = binDir + "/etcd"
+	ctlBinPath = binDir + "/etcdctl"
+	certPath = certDir + "/server.crt"
+	privateKeyPath = certDir + "/server.key.insecure"
+	caPath = certDir + "/ca.crt"
+	revokedCertPath = certDir + "/server-revoked.crt"
+	revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure"
+	crlPath = certDir + "/revoke.crl"
+
 	v := m.Run()
 	if v == 0 && testutil.CheckLeakedGoroutine() {
 		os.Exit(1)

+ 91 - 0
e2e/util.go

@@ -0,0 +1,91 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/pkg/expect"
+)
+
+func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error {
+	c := 0
+	matchSet := func(l string) bool {
+		for _, s := range readyStrs {
+			if strings.Contains(l, s) {
+				c++
+				break
+			}
+		}
+		return c == len(readyStrs)
+	}
+	_, err := exproc.ExpectFunc(matchSet)
+	return err
+}
+
+func spawnWithExpect(args []string, expected string) error {
+	return spawnWithExpects(args, []string{expected}...)
+}
+
+func spawnWithExpects(args []string, xs ...string) error {
+	proc, err := spawnCmd(args)
+	if err != nil {
+		return err
+	}
+	// process until either stdout or stderr contains
+	// the expected string
+	var (
+		lines    []string
+		lineFunc = func(txt string) bool { return true }
+	)
+	for _, txt := range xs {
+		for {
+			l, lerr := proc.ExpectFunc(lineFunc)
+			if lerr != nil {
+				proc.Close()
+				return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
+			}
+			lines = append(lines, l)
+			if strings.Contains(l, txt) {
+				break
+			}
+		}
+	}
+	perr := proc.Close()
+	if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
+		return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
+	}
+	return perr
+}
+
+func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
+	errc := make(chan error, 1)
+	go func() { errc <- p.Close() }()
+	select {
+	case err := <-errc:
+		return err
+	case <-time.After(d):
+		p.Stop()
+		// retry close after stopping to collect SIGQUIT data, if any
+		closeWithTimeout(p, time.Second)
+	}
+	return fmt.Errorf("took longer than %v to Close process %+v", d, p)
+}
+
+func toTLS(s string) string {
+	return strings.Replace(s, "http://", "https://", 1)
+}

+ 8 - 11
e2e/v2_curl_test.go

@@ -23,15 +23,12 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 )
 
-func TestV2CurlNoTLS(t *testing.T)        { testCurlPutGet(t, &configNoTLS) }
-func TestV2CurlAutoTLS(t *testing.T)      { testCurlPutGet(t, &configAutoTLS) }
-func TestV2CurlAllTLS(t *testing.T)       { testCurlPutGet(t, &configTLS) }
-func TestV2CurlPeerTLS(t *testing.T)      { testCurlPutGet(t, &configPeerTLS) }
-func TestV2CurlClientTLS(t *testing.T)    { testCurlPutGet(t, &configClientTLS) }
-func TestV2CurlProxyNoTLS(t *testing.T)   { testCurlPutGet(t, &configWithProxy) }
-func TestV2CurlProxyTLS(t *testing.T)     { testCurlPutGet(t, &configWithProxyTLS) }
-func TestV2CurlProxyPeerTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyPeerTLS) }
-func TestV2CurlClientBoth(t *testing.T)   { testCurlPutGet(t, &configClientBoth) }
+func TestV2CurlNoTLS(t *testing.T)      { testCurlPutGet(t, &configNoTLS) }
+func TestV2CurlAutoTLS(t *testing.T)    { testCurlPutGet(t, &configAutoTLS) }
+func TestV2CurlAllTLS(t *testing.T)     { testCurlPutGet(t, &configTLS) }
+func TestV2CurlPeerTLS(t *testing.T)    { testCurlPutGet(t, &configPeerTLS) }
+func TestV2CurlClientTLS(t *testing.T)  { testCurlPutGet(t, &configClientTLS) }
+func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) }
 func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
 	defer testutil.AfterTest(t)
 
@@ -135,14 +132,14 @@ type cURLReq struct {
 func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []string {
 	var (
 		cmdArgs = []string{"curl"}
-		acurl   = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurl
+		acurl   = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl
 	)
 	if req.isTLS {
 		if clus.cfg.clientTLS != clientTLSAndNonTLS {
 			panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
 		}
 		cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
-		acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurltls
+		acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl)
 	} else if clus.cfg.clientTLS == clientTLS {
 		cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
 	}

+ 29 - 0
embed/config.go

@@ -20,6 +20,7 @@ import (
 	"net"
 	"net/http"
 	"net/url"
+	"path/filepath"
 	"strings"
 
 	"github.com/coreos/etcd/etcdserver"
@@ -393,6 +394,34 @@ func (cfg Config) defaultClientHost() bool {
 	return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
 }
 
+func (cfg *Config) ClientSelfCert() (err error) {
+	if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
+		chosts := make([]string, len(cfg.LCUrls))
+		for i, u := range cfg.LCUrls {
+			chosts[i] = u.Host
+		}
+		cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
+		return err
+	} else if cfg.ClientAutoTLS {
+		plog.Warningf("ignoring client auto TLS since certs given")
+	}
+	return nil
+}
+
+func (cfg *Config) PeerSelfCert() (err error) {
+	if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() {
+		phosts := make([]string, len(cfg.LPUrls))
+		for i, u := range cfg.LPUrls {
+			phosts[i] = u.Host
+		}
+		cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
+		return err
+	} else if cfg.PeerAutoTLS {
+		plog.Warningf("ignoring peer auto TLS since certs given")
+	}
+	return nil
+}
+
 // UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
 // if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
 // e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380

+ 4 - 25
embed/etcd.go

@@ -22,7 +22,6 @@ import (
 	"net"
 	"net/http"
 	"net/url"
-	"path/filepath"
 	"sync"
 	"time"
 
@@ -248,19 +247,9 @@ func (e *Etcd) Close() {
 func (e *Etcd) Err() <-chan error { return e.errc }
 
 func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
-	if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() {
-		phosts := make([]string, len(cfg.LPUrls))
-		for i, u := range cfg.LPUrls {
-			phosts[i] = u.Host
-		}
-		cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
-		if err != nil {
-			plog.Fatalf("could not get certs (%v)", err)
-		}
-	} else if cfg.PeerAutoTLS {
-		plog.Warningf("ignoring peer auto TLS since certs given")
+	if err = cfg.PeerSelfCert(); err != nil {
+		plog.Fatalf("could not get certs (%v)", err)
 	}
-
 	if !cfg.PeerTLSInfo.Empty() {
 		plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
 	}
@@ -302,19 +291,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
 }
 
 func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
-	if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
-		chosts := make([]string, len(cfg.LCUrls))
-		for i, u := range cfg.LCUrls {
-			chosts[i] = u.Host
-		}
-		cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
-		if err != nil {
-			plog.Fatalf("could not get certs (%v)", err)
-		}
-	} else if cfg.ClientAutoTLS {
-		plog.Warningf("ignoring client auto TLS since certs given")
+	if err = cfg.ClientSelfCert(); err != nil {
+		plog.Fatalf("could not get certs (%v)", err)
 	}
-
 	if cfg.EnablePprof {
 		plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
 	}

+ 13 - 1
etcdmain/etcd.go

@@ -199,12 +199,24 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
 func startProxy(cfg *config) error {
 	plog.Notice("proxy: this proxy supports v2 API only!")
 
-	pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
+	clientTLSInfo := cfg.ClientTLSInfo
+	if clientTLSInfo.Empty() {
+		// Support old proxy behavior of defaulting to PeerTLSInfo
+		// for both client and peer connections.
+		clientTLSInfo = cfg.PeerTLSInfo
+	}
+	clientTLSInfo.InsecureSkipVerify = cfg.ClientAutoTLS
+	cfg.PeerTLSInfo.InsecureSkipVerify = cfg.PeerAutoTLS
+
+	pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
 	if err != nil {
 		return err
 	}
 	pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
 
+	if err = cfg.PeerSelfCert(); err != nil {
+		plog.Fatalf("could not get certs (%v)", err)
+	}
 	tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond)
 	if err != nil {
 		return err

+ 156 - 96
etcdmain/grpc_proxy.go

@@ -15,12 +15,13 @@
 package etcdmain
 
 import (
-	"crypto/tls"
 	"fmt"
+	"math"
 	"net"
 	"net/http"
 	"net/url"
 	"os"
+	"path/filepath"
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
@@ -45,9 +46,22 @@ var (
 	grpcProxyEndpoints         []string
 	grpcProxyDNSCluster        string
 	grpcProxyInsecureDiscovery bool
-	grpcProxyCert              string
-	grpcProxyKey               string
-	grpcProxyCA                string
+	grpcProxyDataDir           string
+
+	// tls for connecting to etcd
+
+	grpcProxyCA                    string
+	grpcProxyCert                  string
+	grpcProxyKey                   string
+	grpcProxyInsecureSkipTLSVerify bool
+
+	// tls for clients connecting to proxy
+
+	grpcProxyListenCA      string
+	grpcProxyListenCert    string
+	grpcProxyListenKey     string
+	grpcProxyListenAutoTLS bool
+	grpcProxyListenCRL     string
 
 	grpcProxyAdvertiseClientURL string
 	grpcProxyResolverPrefix     string
@@ -85,19 +99,77 @@ func newGRPCProxyStartCommand() *cobra.Command {
 	cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
 	cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
 	cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
-	cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
-	cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
-	cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
 	cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
 	cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
 	cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
 	cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
 	cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
+	cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
+
+	// client TLS for connecting to server
+	cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
+	cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
+	cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
+	cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
+
+	// client TLS for connecting to proxy
+	cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
+	cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
+	cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
+	cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
+	cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
 
 	return &cmd
 }
 
 func startGRPCProxy(cmd *cobra.Command, args []string) {
+	checkArgs()
+
+	tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
+	if tlsinfo == nil && grpcProxyListenAutoTLS {
+		host := []string{"https://" + grpcProxyListenAddr}
+		dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
+		autoTLS, err := transport.SelfCert(dir, host)
+		if err != nil {
+			plog.Fatal(err)
+		}
+		tlsinfo = &autoTLS
+	}
+	if tlsinfo != nil {
+		plog.Infof("ServerTLS: %s", tlsinfo)
+	}
+	m := mustListenCMux(tlsinfo)
+
+	grpcl := m.Match(cmux.HTTP2())
+	defer func() {
+		grpcl.Close()
+		plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
+	}()
+
+	client := mustNewClient()
+
+	srvhttp, httpl := mustHTTPListener(m, tlsinfo)
+	errc := make(chan error)
+	go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
+	go func() { errc <- srvhttp.Serve(httpl) }()
+	go func() { errc <- m.Serve() }()
+	if len(grpcProxyMetricsListenAddr) > 0 {
+		mhttpl := mustMetricsListener(tlsinfo)
+		go func() {
+			mux := http.NewServeMux()
+			mux.Handle("/metrics", prometheus.Handler())
+			plog.Fatal(http.Serve(mhttpl, mux))
+		}()
+	}
+
+	// grpc-proxy is initialized, ready to serve
+	notifySystemd()
+
+	fmt.Fprintln(os.Stderr, <-errc)
+	os.Exit(1)
+}
+
+func checkArgs() {
 	if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
 		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
 		os.Exit(1)
@@ -110,40 +182,79 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 		fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
 		os.Exit(1)
 	}
+}
 
+func mustNewClient() *clientv3.Client {
 	srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
-	if len(srvs.Endpoints) != 0 {
-		grpcProxyEndpoints = srvs.Endpoints
+	eps := srvs.Endpoints
+	if len(eps) == 0 {
+		eps = grpcProxyEndpoints
 	}
-
-	l, err := net.Listen("tcp", grpcProxyListenAddr)
+	cfg, err := newClientCfg(eps)
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err)
 		os.Exit(1)
 	}
-	if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
+	client, err := clientv3.New(*cfg)
+	if err != nil {
 		fmt.Fprintln(os.Stderr, err)
 		os.Exit(1)
 	}
-	plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
-	defer func() {
-		l.Close()
-		plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
-	}()
-	m := cmux.New(l)
+	return client
+}
 
-	cfg, cfgtls, err := newClientCfg()
+func newClientCfg(eps []string) (*clientv3.Config, error) {
+	// set tls if any one tls option set
+	cfg := clientv3.Config{
+		Endpoints:   eps,
+		DialTimeout: 5 * time.Second,
+	}
+	tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
+	if tls == nil && grpcProxyInsecureSkipTLSVerify {
+		tls = &transport.TLSInfo{}
+	}
+	if tls != nil {
+		clientTLS, err := tls.ClientConfig()
+		if err != nil {
+			return nil, err
+		}
+		clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
+		cfg.TLS = clientTLS
+		plog.Infof("ClientTLS: %s", tls)
+	}
+	return &cfg, nil
+}
+
+func newTLS(ca, cert, key string) *transport.TLSInfo {
+	if ca == "" && cert == "" && key == "" {
+		return nil
+	}
+	return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
+}
+
+func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
+	l, err := net.Listen("tcp", grpcProxyListenAddr)
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err)
 		os.Exit(1)
 	}
 
-	client, err := clientv3.New(*cfg)
-	if err != nil {
+	if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
 		fmt.Fprintln(os.Stderr, err)
 		os.Exit(1)
 	}
+	if tlsinfo != nil {
+		tlsinfo.CRLFile = grpcProxyListenCRL
+		if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
+			plog.Fatal(err)
+		}
+	}
 
+	plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
+	return cmux.New(l)
+}
+
+func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
 	if len(grpcProxyNamespace) > 0 {
 		client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
 		client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
@@ -165,7 +276,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	server := grpc.NewServer(
 		grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
 		grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
+		grpc.MaxConcurrentStreams(math.MaxUint32),
 	)
+
 	pb.RegisterKVServer(server, kvp)
 	pb.RegisterWatchServer(server, watchp)
 	pb.RegisterClusterServer(server, clusterp)
@@ -174,12 +287,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	pb.RegisterAuthServer(server, authp)
 	v3electionpb.RegisterElectionServer(server, electionp)
 	v3lockpb.RegisterLockServer(server, lockp)
+	return server
+}
 
-	errc := make(chan error)
-
-	grpcl := m.Match(cmux.HTTP2())
-	go func() { errc <- server.Serve(grpcl) }()
-
+func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
 	httpmux := http.NewServeMux()
 	httpmux.HandleFunc("/", http.NotFound)
 	httpmux.Handle("/metrics", prometheus.Handler())
@@ -189,82 +300,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 		}
 		plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
 	}
+	srvhttp := &http.Server{Handler: httpmux}
 
-	srvhttp := &http.Server{
-		Handler: httpmux,
+	if tlsinfo == nil {
+		return srvhttp, m.Match(cmux.HTTP1())
 	}
 
-	var httpl net.Listener
-	if cfg.TLS != nil {
-		srvhttp.TLSConfig = cfg.TLS
-		httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
-	} else {
-		httpl = m.Match(cmux.HTTP1())
-	}
-	go func() { errc <- srvhttp.Serve(httpl) }()
-
-	go func() { errc <- m.Serve() }()
-
-	if len(grpcProxyMetricsListenAddr) > 0 {
-		murl, err := url.Parse(grpcProxyMetricsListenAddr)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
-			os.Exit(1)
-		}
-		ml, err := transport.NewListener(murl.Host, murl.Scheme, cfgtls)
-		if err != nil {
-			fmt.Fprintln(os.Stderr, err)
-			os.Exit(1)
-		}
-
-		mux := http.NewServeMux()
-		mux.Handle("/metrics", prometheus.Handler())
-
-		go func() {
-			plog.Info("grpc-proxy: listening for metrics on ", murl.String())
-			plog.Fatal(http.Serve(ml, mux))
-		}()
+	srvTLS, err := tlsinfo.ServerConfig()
+	if err != nil {
+		plog.Fatalf("could not setup TLS (%v)", err)
 	}
-
-	// grpc-proxy is initialized, ready to serve
-	notifySystemd()
-
-	fmt.Fprintln(os.Stderr, <-errc)
-	os.Exit(1)
+	srvhttp.TLSConfig = srvTLS
+	return srvhttp, m.Match(cmux.Any())
 }
 
-func newClientCfg() (*clientv3.Config, *transport.TLSInfo, error) {
-	// set tls if any one tls option set
-	var cfgtls *transport.TLSInfo
-	tlsinfo := transport.TLSInfo{}
-	if grpcProxyCert != "" {
-		tlsinfo.CertFile = grpcProxyCert
-		cfgtls = &tlsinfo
-	}
-
-	if grpcProxyKey != "" {
-		tlsinfo.KeyFile = grpcProxyKey
-		cfgtls = &tlsinfo
-	}
-
-	if grpcProxyCA != "" {
-		tlsinfo.CAFile = grpcProxyCA
-		cfgtls = &tlsinfo
-	}
-
-	cfg := clientv3.Config{
-		Endpoints:   grpcProxyEndpoints,
-		DialTimeout: 5 * time.Second,
+func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
+	murl, err := url.Parse(grpcProxyMetricsListenAddr)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
+		os.Exit(1)
 	}
-	if cfgtls != nil {
-		clientTLS, err := cfgtls.ClientConfig()
-		if err != nil {
-			return nil, nil, err
-		}
-		cfg.TLS = clientTLS
+	ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
+	if err != nil {
+		fmt.Fprintln(os.Stderr, err)
+		os.Exit(1)
 	}
-
-	// TODO: support insecure tls
-
-	return &cfg, cfgtls, nil
+	plog.Info("grpc-proxy: listening for metrics on ", murl.String())
+	return ml
 }

+ 8 - 1
etcdmain/main.go

@@ -17,6 +17,7 @@ package etcdmain
 import (
 	"fmt"
 	"os"
+	"strings"
 
 	"github.com/coreos/go-systemd/daemon"
 	systemdutil "github.com/coreos/go-systemd/util"
@@ -26,7 +27,13 @@ func Main() {
 	checkSupportArch()
 
 	if len(os.Args) > 1 {
-		switch os.Args[1] {
+		cmd := os.Args[1]
+		if covArgs := os.Getenv("ETCDCOV_ARGS"); len(covArgs) > 0 {
+			args := strings.Split(os.Getenv("ETCDCOV_ARGS"), "\xe7\xcd")[1:]
+			rootCmd.SetArgs(args)
+			cmd = "grpc-proxy"
+		}
+		switch cmd {
 		case "gateway", "grpc-proxy":
 			if err := rootCmd.Execute(); err != nil {
 				fmt.Fprint(os.Stderr, err)

+ 8 - 6
pkg/transport/listener.go

@@ -56,12 +56,13 @@ func wrapTLS(addr, scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listene
 }
 
 type TLSInfo struct {
-	CertFile       string
-	KeyFile        string
-	CAFile         string
-	TrustedCAFile  string
-	ClientCertAuth bool
-	CRLFile        string
+	CertFile           string
+	KeyFile            string
+	CAFile             string
+	TrustedCAFile      string
+	ClientCertAuth     bool
+	CRLFile            string
+	InsecureSkipVerify bool
 
 	// ServerName ensures the cert matches the given host in case of discovery / virtual hosting
 	ServerName string
@@ -236,6 +237,7 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) {
 	} else {
 		cfg = &tls.Config{ServerName: info.ServerName}
 	}
+	cfg.InsecureSkipVerify = info.InsecureSkipVerify
 
 	CAFiles := info.cafiles()
 	if len(CAFiles) > 0 {

+ 5 - 0
proxy/grpcproxy/maintenance.go

@@ -15,6 +15,8 @@
 package grpcproxy
 
 import (
+	"io"
+
 	"golang.org/x/net/context"
 
 	"github.com/coreos/etcd/clientv3"
@@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan
 	for {
 		rr, err := sc.Recv()
 		if err != nil {
+			if err == io.EOF {
+				return nil
+			}
 			return err
 		}
 		err = stream.Send(rr)

+ 4 - 0
test

@@ -170,7 +170,10 @@ function cov_pass {
 	# use 30m timeout because e2e coverage takes longer
 	# due to many tests cause etcd process to wait
 	# on leadership transfer timeout during gracefully shutdown
+	echo Testing e2e without proxy...
 	go test -tags cov -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e"
+	echo Testing e2e with proxy...
+	go test -tags "cov cluster_proxy" -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e-proxy"
 
 	# incrementally merge to get coverage data even if some coverage files are corrupted
 	# optimistically assume etcdserver package's coverage file is OK since gocovmerge
@@ -217,6 +220,7 @@ function integration_e2e_pass {
 function grpcproxy_pass {
 	go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration
 	go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
+	go test -timeout 15m -v -tags cluster_proxy $@ ${REPO_PATH}/e2e
 }
 
 function release_pass {