瀏覽代碼

Merge pull request #5410 from gyuho/e2e_migrate

e2e: test migrate command
Gyu-Ho Lee 9 年之前
父節點
當前提交
663db2bbf8
共有 2 個文件被更改,包括 174 次插入30 次删除
  1. 108 0
      e2e/ctl_v3_migrate_test.go
  2. 66 30
      e2e/etcd_test.go

+ 108 - 0
e2e/ctl_v3_migrate_test.go

@@ -0,0 +1,108 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package e2e
+
+import (
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"golang.org/x/net/context"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+func TestCtlV3Migrate(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	epc := setupEtcdctlTest(t, &configNoTLS, true)
+	defer func() {
+		if errC := epc.Close(); errC != nil {
+			t.Fatalf("error closing etcd processes (%v)", errC)
+		}
+	}()
+
+	keys := make([]string, 3)
+	vals := make([]string, 3)
+	for i := range keys {
+		keys[i] = fmt.Sprintf("foo_%d", i)
+		vals[i] = fmt.Sprintf("bar_%d", i)
+	}
+	for i := range keys {
+		if err := etcdctlSet(epc, keys[i], vals[i]); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	dataDirs := make([]string, len(epc.procs))
+	for i := range epc.procs {
+		dataDirs[i] = epc.procs[i].cfg.dataDirPath
+	}
+	if err := epc.Stop(); err != nil {
+		t.Fatalf("error closing etcd processes (%v)", err)
+	}
+
+	os.Setenv("ETCDCTL_API", "3")
+	defer os.Unsetenv("ETCDCTL_API")
+	cx := ctlCtx{
+		t:           t,
+		cfg:         configNoTLS,
+		dialTimeout: 7 * time.Second,
+		quorum:      true,
+		epc:         epc,
+	}
+	for i := range dataDirs {
+		if err := ctlV3Migrate(cx, dataDirs[i], ""); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := range epc.procs {
+		epc.procs[i].cfg.keepDataDir = true
+	}
+	if err := epc.Restart(); err != nil {
+		t.Fatal(err)
+	}
+
+	// to ensure revision increment is continuous from migrated v2 data
+	if err := ctlV3Put(cx, "test", "value", ""); err != nil {
+		t.Fatal(err)
+	}
+	cli, err := clientv3.New(clientv3.Config{
+		Endpoints:   epc.grpcEndpoints(),
+		DialTimeout: 3 * time.Second,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer cli.Close()
+	resp, err := cli.Get(context.TODO(), "test")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(resp.Kvs) != 1 {
+		t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
+	}
+	if resp.Kvs[0].CreateRevision != 4 {
+		t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
+	}
+}
+
+func ctlV3Migrate(cx ctlCtx, dataDir, walDir string) error {
+	cmdArgs := append(cx.PrefixArgs(), "migrate", "--data-dir", dataDir, "--wal-dir", walDir)
+	return spawnWithExpects(cmdArgs, "finished transforming keys")
+}

+ 66 - 30
e2e/etcd_test.go

@@ -130,8 +130,10 @@ type etcdProcessConfig struct {
 	acurl string
 	acurl string
 	// additional url for tls connection when the etcd process
 	// additional url for tls connection when the etcd process
 	// serves both http and https
 	// serves both http and https
-	acurltls string
-	isProxy  bool
+	acurltls  string
+	acurlHost string
+
+	isProxy bool
 }
 }
 
 
 type etcdProcessClusterConfig struct {
 type etcdProcessClusterConfig struct {
@@ -169,28 +171,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
 		epc.procs[i] = proc
 		epc.procs[i] = proc
 	}
 	}
 
 
-	// wait for cluster to start
-	readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
-	readyStr := "enabled capabilities for version"
-	for i := range etcdCfgs {
-		go func(etcdp *etcdProcess) {
-			rs := readyStr
-			if etcdp.cfg.isProxy {
-				// rs = "proxy: listening for client requests on"
-				rs = "proxy: endpoints found"
-			}
-			_, err := etcdp.proc.Expect(rs)
-			readyC <- err
-			close(etcdp.donec)
-		}(epc.procs[i])
-	}
-	for range etcdCfgs {
-		if err := <-readyC; err != nil {
-			epc.Close()
-			return nil, err
-		}
-	}
-	return epc, nil
+	return epc, epc.Start()
 }
 }
 
 
 func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
 func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
@@ -231,14 +212,15 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 		var curls []string
 		var curls []string
 		var curl, curltls string
 		var curl, curltls string
 		port := cfg.basePort + 2*i
 		port := cfg.basePort + 2*i
+		curlHost := fmt.Sprintf("localhost:%d", port)
 
 
 		switch cfg.clientTLS {
 		switch cfg.clientTLS {
 		case clientNonTLS, clientTLS:
 		case clientNonTLS, clientTLS:
-			curl = (&url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}).String()
+			curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
 			curls = []string{curl}
 			curls = []string{curl}
 		case clientTLSAndNonTLS:
 		case clientTLSAndNonTLS:
-			curl = (&url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", port)}).String()
-			curltls = (&url.URL{Scheme: "https", Host: fmt.Sprintf("localhost:%d", port)}).String()
+			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
+			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
 			curls = []string{curl, curltls}
 			curls = []string{curl, curltls}
 		}
 		}
 
 
@@ -281,11 +263,13 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 			keepDataDir: cfg.keepDataDir,
 			keepDataDir: cfg.keepDataDir,
 			acurl:       curl,
 			acurl:       curl,
 			acurltls:    curltls,
 			acurltls:    curltls,
+			acurlHost:   curlHost,
 		}
 		}
 	}
 	}
 	for i := 0; i < cfg.proxySize; i++ {
 	for i := 0; i < cfg.proxySize; i++ {
 		port := cfg.basePort + 2*cfg.clusterSize + i + 1
 		port := cfg.basePort + 2*cfg.clusterSize + i + 1
-		curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
+		curlHost := fmt.Sprintf("localhost:%d", port)
+		curl := url.URL{Scheme: clientScheme, Host: curlHost}
 		name := fmt.Sprintf("testname-proxy%d", i)
 		name := fmt.Sprintf("testname-proxy%d", i)
 		dataDirPath, derr := ioutil.TempDir("", name+".etcd")
 		dataDirPath, derr := ioutil.TempDir("", name+".etcd")
 		if derr != nil {
 		if derr != nil {
@@ -303,6 +287,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 			dataDirPath: dataDirPath,
 			dataDirPath: dataDirPath,
 			keepDataDir: cfg.keepDataDir,
 			keepDataDir: cfg.keepDataDir,
 			acurl:       curl.String(),
 			acurl:       curl.String(),
+			acurlHost:   curlHost,
 			isProxy:     true,
 			isProxy:     true,
 		}
 		}
 	}
 	}
@@ -344,12 +329,47 @@ func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
 	return args
 	return args
 }
 }
 
 
-func (epc *etcdProcessCluster) Close() (err error) {
+func (epc *etcdProcessCluster) Start() (err error) {
+	readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
+	readyStr := "enabled capabilities for version"
+	for i := range epc.procs {
+		go func(etcdp *etcdProcess) {
+			etcdp.donec = make(chan struct{})
+			rs := readyStr
+			if etcdp.cfg.isProxy {
+				rs = "proxy: endpoints found"
+			}
+			_, err := etcdp.proc.Expect(rs)
+			readyC <- err
+			close(etcdp.donec)
+		}(epc.procs[i])
+	}
+	for range epc.procs {
+		if err := <-readyC; err != nil {
+			epc.Close()
+			return err
+		}
+	}
+	return nil
+}
+
+func (epc *etcdProcessCluster) Restart() error {
+	for i := range epc.procs {
+		proc, err := newEtcdProcess(epc.procs[i].cfg)
+		if err != nil {
+			epc.Close()
+			return err
+		}
+		epc.procs[i] = proc
+	}
+	return epc.Start()
+}
+
+func (epc *etcdProcessCluster) Stop() (err error) {
 	for _, p := range epc.procs {
 	for _, p := range epc.procs {
 		if p == nil {
 		if p == nil {
 			continue
 			continue
 		}
 		}
-		os.RemoveAll(p.cfg.dataDirPath)
 		if curErr := p.proc.Stop(); curErr != nil {
 		if curErr := p.proc.Stop(); curErr != nil {
 			if err != nil {
 			if err != nil {
 				err = fmt.Errorf("%v; %v", err, curErr)
 				err = fmt.Errorf("%v; %v", err, curErr)
@@ -362,6 +382,14 @@ func (epc *etcdProcessCluster) Close() (err error) {
 	return err
 	return err
 }
 }
 
 
+func (epc *etcdProcessCluster) Close() error {
+	err := epc.Stop()
+	for _, p := range epc.procs {
+		os.RemoveAll(p.cfg.dataDirPath)
+	}
+	return err
+}
+
 func spawnCmd(args []string) (*expect.ExpectProcess, error) {
 func spawnCmd(args []string) (*expect.ExpectProcess, error) {
 	return expect.NewExpect(args[0], args[1:]...)
 	return expect.NewExpect(args[0], args[1:]...)
 }
 }
@@ -419,3 +447,11 @@ func (epc *etcdProcessCluster) endpoints() []string {
 	}
 	}
 	return eps
 	return eps
 }
 }
+
+func (epc *etcdProcessCluster) grpcEndpoints() []string {
+	eps := make([]string, epc.cfg.clusterSize)
+	for i, ep := range epc.backends() {
+		eps[i] = ep.cfg.acurlHost
+	}
+	return eps
+}