Browse Source

Merge pull request #4330 from gyuho/proxy_e2e

e2e: etcdctl test for proxy no-sync
Gyu-Ho Lee 10 years ago
parent
commit
7b8cd274c9
2 changed files with 162 additions and 4 deletions
  1. 37 4
      e2e/etcd_test.go
  2. 125 0
      e2e/etcdctl_test.go

+ 37 - 4
e2e/etcd_test.go

@@ -145,10 +145,12 @@ type etcdProcessConfig struct {
 	args        []string
 	dataDirPath string
 	acurl       url.URL
+	isProxy     bool
 }
 
 type etcdProcessClusterConfig struct {
 	clusterSize  int
+	proxySize    int
 	isClientTLS  bool
 	isPeerTLS    bool
 	initialToken string
@@ -160,7 +162,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
 	etcdCfgs := cfg.etcdProcessConfigs()
 	epc := &etcdProcessCluster{
 		cfg:   cfg,
-		procs: make([]*etcdProcess, cfg.clusterSize),
+		procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
 	}
 
 	// launch etcd processes
@@ -174,11 +176,15 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
 	}
 
 	// wait for cluster to start
-	readyC := make(chan error, cfg.clusterSize)
+	readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
 	readyStr := "set the initial cluster version"
 	for i := range etcdCfgs {
 		go func(etcdp *etcdProcess) {
-			_, err := etcdp.proc.ExpectRegex(readyStr)
+			rs := readyStr
+			if etcdp.cfg.isProxy {
+				rs = "listening for client requests"
+			}
+			_, err := etcdp.proc.ExpectRegex(rs)
 			readyC <- err
 			etcdp.proc.ReadLine()
 			etcdp.proc.Interact() // this blocks(leaks) if another goroutine is reading
@@ -220,7 +226,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 		peerScheme = "https"
 	}
 
-	etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize)
+	etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
 	initialCluster := make([]string, cfg.clusterSize)
 	for i := 0; i < cfg.clusterSize; i++ {
 		port := etcdProcessBasePort + 2*i
@@ -262,6 +268,24 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 			acurl:       curl,
 		}
 	}
+	for i := 0; i < cfg.proxySize; i++ {
+		port := etcdProcessBasePort + 2*cfg.clusterSize + i + 1
+		curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
+		name := fmt.Sprintf("testname-proxy%d", i)
+		dataDirPath := name + ".etcd"
+		args := []string{
+			"--name", name,
+			"--proxy", "on",
+			"--listen-client-urls", curl.String(),
+			"--data-dir", dataDirPath,
+		}
+		etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
+			args:        args,
+			dataDirPath: dataDirPath,
+			acurl:       curl,
+			isProxy:     true,
+		}
+	}
 
 	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
 	for i := range etcdCfgs {
@@ -289,6 +313,15 @@ func (epc *etcdProcessCluster) Close() (err error) {
 	return err
 }
 
+// proxies returns only the proxy etcdProcess.
+func (epc *etcdProcessCluster) proxies() []*etcdProcess {
+	return epc.procs[epc.cfg.clusterSize:]
+}
+
+func (epc *etcdProcessCluster) backends() []*etcdProcess {
+	return epc.procs[:epc.cfg.clusterSize]
+}
+
 func spawnCmd(args []string) (*gexpect.ExpectSubprocess, error) {
 	// redirect stderr to stdout since gexpect only uses stdout
 	cmd := `/bin/sh -c "` + strings.Join(args, " ") + ` 2>&1 "`

+ 125 - 0
e2e/etcdctl_test.go

@@ -0,0 +1,125 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/pkg/fileutil"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) {
+	defer testutil.AfterTest(t)
+	testProcessClusterV2CtlWatch(
+		t,
+		&etcdProcessClusterConfig{
+			clusterSize:  3,
+			proxySize:    1,
+			isClientTLS:  false,
+			isPeerTLS:    false,
+			initialToken: "new",
+		},
+		false,
+	)
+}
+
+func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) {
+	defer testutil.AfterTest(t)
+	testProcessClusterV2CtlWatch(
+		t,
+		&etcdProcessClusterConfig{
+			clusterSize:  3,
+			proxySize:    1,
+			isClientTLS:  false,
+			isPeerTLS:    false,
+			initialToken: "new",
+		},
+		true,
+	)
+}
+
+func etcdctlSet(epc *etcdProcessCluster, key, value string, noSync bool) error {
+	endpoint := ""
+	if proxies := epc.proxies(); len(proxies) != 0 {
+		endpoint = proxies[0].cfg.acurl.String()
+	} else if backends := epc.backends(); len(backends) != 0 {
+		endpoint = backends[0].cfg.acurl.String()
+	}
+
+	putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint}
+	if noSync {
+		putArgs = append(putArgs, "--no-sync")
+	}
+	putArgs = append(putArgs, "set", key, value)
+
+	return spawnWithExpect(putArgs, value)
+}
+
+func etcdctlWatch(epc *etcdProcessCluster, key, value string, noSync bool, done chan struct{}, errChan chan error) {
+	endpoint := ""
+	if proxies := epc.proxies(); len(proxies) != 0 {
+		endpoint = proxies[0].cfg.acurl.String()
+	} else if backends := epc.backends(); len(backends) != 0 {
+		endpoint = backends[0].cfg.acurl.String()
+	}
+
+	watchArgs := []string{"../bin/etcdctl", "--endpoint", endpoint}
+	if noSync {
+		watchArgs = append(watchArgs, "--no-sync")
+	}
+	watchArgs = append(watchArgs, "watch", key)
+
+	if err := spawnWithExpect(watchArgs, value); err != nil {
+		errChan <- err
+		return
+	}
+	done <- struct{}{}
+}
+
+func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) {
+	if fileutil.Exist("../bin/etcdctl") == false {
+		t.Fatalf("could not find etcdctl binary")
+	}
+
+	epc, errC := newEtcdProcessCluster(cfg)
+	if errC != nil {
+		t.Fatalf("could not start etcd process cluster (%v)", errC)
+	}
+	defer func() {
+		if errC := epc.Close(); errC != nil {
+			t.Fatalf("error closing etcd processes (%v)", errC)
+		}
+	}()
+
+	key, value := "foo", "bar"
+	done, errChan := make(chan struct{}), make(chan error)
+
+	go etcdctlWatch(epc, key, value, noSync, done, errChan)
+
+	if err := etcdctlSet(epc, key, value, noSync); err != nil {
+		t.Fatalf("failed set (%v)", err)
+	}
+
+	select {
+	case <-done:
+		return
+	case err := <-errChan:
+		t.Fatalf("failed watch (%v)", err)
+	case <-time.After(2 * time.Second):
+		t.Fatalf("watch timed out!")
+	}
+}